Added discovery support for Weave interface

This commit is contained in:
Mark Qvist 2026-01-01 14:49:40 +01:00
parent 994c4fd699
commit e56e80aade

View file

@ -19,6 +19,7 @@ BANDWIDTH = 0x0A
SPREADINGFACTOR = 0x0B
CODINGRATE = 0x0C
MODULATION = 0x0D
CHANNEL = 0x0E
APP_NAME = "rnstransport"
@ -27,7 +28,8 @@ class InterfaceAnnouncer():
DEFAULT_STAMP_VALUE = 14
WORKBLOCK_EXPAND_ROUNDS = 20
DISCOVERABLE_INTERFACE_TYPES = ["BackboneInterface", "TCPServerInterface", "TCPClientInterface", "RNodeInterface", "I2PInterface", "KISSInterface"]
DISCOVERABLE_INTERFACE_TYPES = ["BackboneInterface", "TCPServerInterface", "TCPClientInterface",
"RNodeInterface", "WeaveInterface", "I2PInterface", "KISSInterface"]
def __init__(self, owner):
import importlib.util
@ -64,11 +66,11 @@ class InterfaceAnnouncer():
if len(due_interfaces) > 0:
selected_interface = due_interfaces[0]
selected_interface.last_discovery_announce = time.time()
RNS.log(f"Preparing interface discovery announce for {selected_interface.name}", RNS.LOG_VERBOSE)
RNS.log(f"Preparing interface discovery announce for {selected_interface.name}", RNS.LOG_DEBUG)
app_data = self.get_interface_announce_data(selected_interface)
if not app_data: RNS.log(f"Could not generate interface discovery announce data for {selected_interface.name}", RNS.LOG_ERROR)
else:
RNS.log(f"Sending interface discovery announce for {selected_interface.name}", RNS.LOG_VERBOSE)
RNS.log(f"Sending interface discovery announce for {selected_interface.name} with {len(app_data)}B payload", RNS.LOG_DEBUG)
self.discovery_destination.announce(app_data=app_data)
except Exception as e:
@ -86,12 +88,13 @@ class InterfaceAnnouncer():
stamp_value = interface.discovery_stamp_value if interface.discovery_stamp_value else self.DEFAULT_STAMP_VALUE
if not interface_type in self.DISCOVERABLE_INTERFACE_TYPES: return None
else:
info = {INTERFACE_TYPE: interface_type,
TRANSPORT: RNS.Reticulum.transport_enabled(),
NAME: self.sanitize(interface.discovery_name),
LATITUDE: interface.discovery_latitude,
LONGITUDE: interface.discovery_longitude,
HEIGHT: interface.discovery_height}
flags = bytes([0x00])
info = {INTERFACE_TYPE: interface_type,
TRANSPORT: RNS.Reticulum.transport_enabled(),
NAME: self.sanitize(interface.discovery_name),
LATITUDE: interface.discovery_latitude,
LONGITUDE: interface.discovery_longitude,
HEIGHT: interface.discovery_height}
if interface_type in ["BackboneInterface", "TCPServerInterface"]:
info[REACHABLE_ON] = self.sanitize(interface.reachable_on)
@ -106,6 +109,12 @@ class InterfaceAnnouncer():
info[SPREADINGFACTOR] = interface.sf
info[CODINGRATE] = interface.cr
if interface_type == "WeaveInterface":
info[FREQUENCY] = interface.discovery_frequency
info[BANDWIDTH] = interface.discovery_bandwidth
info[CHANNEL] = interface.discovery_channel
info[MODULATION] = interface.discovery_modulation
if interface_type == "KISSInterface" or (interface_type == "TCPClientInterface" and interface.kiss_framing):
info[INTERFACE_TYPE] = "KISSInterface"
info[FREQUENCY] = interface.discovery_frequency
@ -119,13 +128,13 @@ class InterfaceAnnouncer():
packed = msgpack.packb(info)
infohash = RNS.Identity.full_hash(packed)
if infohash in self.stamp_cache: return packed+self.stamp_cache[infohash]
if infohash in self.stamp_cache: return flags+packed+self.stamp_cache[infohash]
else: stamp, v = self.stamper.generate_stamp(infohash, stamp_cost=stamp_value, expand_rounds=self.WORKBLOCK_EXPAND_ROUNDS)
if not stamp: return None
else:
self.stamp_cache[infohash] = stamp
return bytes([0x00])+packed+stamp
return flags+packed+stamp
class InterfaceAnnounceHandler:
def __init__(self, required_value=InterfaceAnnouncer.DEFAULT_STAMP_VALUE, callback=None):
@ -224,6 +233,20 @@ class InterfaceAnnounceHandler:
cfg_identity_str = f"\n transport_identity = {cfg_identity}"
info["config_entry"] = f"[[{cfg_name}]]\n type = RNodeInterface\n enabled = yes\n port = \n frequency = {cfg_frequency}\n bandwidth = {cfg_bandwidth}\n spreadingfactor = {cfg_sf}\n codingrate = {cfg_cr}\n txpower = {cfg_netname_str}{cfg_netkey_str}"
if interface_type == "WeaveInterface":
info["frequency"] = unpacked[FREQUENCY]
info["bandwidth"] = unpacked[BANDWIDTH]
info["channel"] = unpacked[CHANNEL]
info["modulation"] = unpacked[MODULATION]
cfg_name = info["name"]
cfg_identity = info["identity"]
cfg_netname = info["ifac_netname"] if "ifac_netname" in info else None
cfg_netkey = info["ifac_netkey"] if "ifac_netkey" in info else None
cfg_netname_str = f"\n network_name = {cfg_netname}" if cfg_netname else ""
cfg_netkey_str = f"\n passphrase = {cfg_netkey}" if cfg_netkey else ""
cfg_identity_str = f"\n transport_identity = {cfg_identity}"
info["config_entry"] = f"[[{cfg_name}]]\n type = WeaveInterface\n enabled = yes\n port = {cfg_netname_str}{cfg_netkey_str}"
if interface_type == "KISSInterface":
info["frequency"] = unpacked[FREQUENCY]
info["bandwidth"] = unpacked[BANDWIDTH]