Added in-app BLE scanning and pairing

This commit is contained in:
Mark Qvist 2025-07-14 16:06:50 +02:00
parent 0d2f7b25a3
commit 73601ebe1e
2 changed files with 197 additions and 11 deletions

View file

@ -277,6 +277,9 @@ else:
from kivymd.utils.set_bars_colors import set_bars_colors
android_api_version = autoclass('android.os.Build$VERSION').SDK_INT
from android.broadcast import BroadcastReceiver
BluetoothAdapter = autoclass('android.bluetooth.BluetoothAdapter')
else:
from .sideband.core import SidebandCore
import sbapp.plyer as plyer
@ -408,6 +411,9 @@ class SidebandApp(MDApp):
self.repository_url = None
self.rnode_flasher_url = None
self.bt_adapter = None
self.discovered_bt_devices = {}
self.bt_bonded_devices = []
#################################################
# Application Startup #
@ -1008,6 +1014,65 @@ class SidebandApp(MDApp):
self.check_bluetooth_permissions()
def bluetooth_update_bonded_devices(self, sender=None):
if self.bt_adapter == None: self.bt_adapter = BluetoothAdapter.getDefaultAdapter()
self.bt_bonded_devices = []
for device in self.bt_adapter.getBondedDevices():
device_addr = device.getAddress()
self.bt_bonded_devices.append(device_addr)
RNS.log(f"Updated bonded devices: {self.bt_bonded_devices}", RNS.LOG_DEBUG)
def bluetooth_scan_action(self, sender=None):
toast("Starting Bluetooth scan...")
self.start_bluetooth_scan()
def start_bluetooth_scan(self):
self.check_bluetooth_permissions()
if not self.sideband.getpersistent("permissions.bluetooth"):
self.request_bluetooth_permissions()
else:
RNS.log("Starting bluetooth scan", RNS.LOG_DEBUG)
self.discovered_bt_devices = {}
BluetoothDevice = autoclass('android.bluetooth.BluetoothDevice')
self.bt_found_action = BluetoothDevice.ACTION_FOUND
self.broadcast_receiver = BroadcastReceiver(self.on_broadcast, actions=[self.bt_found_action])
self.broadcast_receiver.start()
self.bt_adapter = BluetoothAdapter.getDefaultAdapter()
self.bluetooth_update_bonded_devices()
self.bt_adapter.startDiscovery()
def stop_bluetooth_scan(self):
RNS.log("Stopping bluetooth scan", RNS.LOG_DEBUG)
self.check_bluetooth_permissions()
if not self.sideband.getpersistent("permissions.bluetooth"):
self.request_bluetooth_permissions()
else:
self.bt_adapter = BluetoothAdapter.getDefaultAdapter()
self.bt_adapter.cancelDiscovery()
def on_broadcast(self, context, intent):
BluetoothDevice = autoclass('android.bluetooth.BluetoothDevice')
action = intent.getAction()
extras = intent.getExtras()
if str(action) == "android.bluetooth.device.action.FOUND":
if extras:
try:
device = intent.getParcelableExtra("android.bluetooth.device.extra.DEVICE", BluetoothDevice)
dev_name = device.getName()
dev_addr = device.getAddress()
if dev_name.startswith("RNode "):
dev_rssi = extras.getShort("android.bluetooth.device.extra.RSSI", -9999)
discovered_device = {"name": dev_name, "address": dev_addr, "rssi": dev_rssi, "discovered": time.time()}
self.discovered_bt_devices[dev_addr] = discovered_device
RNS.log(f"Discovered RNode: {discovered_device}", RNS.LOG_DEBUG)
except Exception as e:
RNS.log(f"Error while mapping discovered device: {e}", RNS.LOG_ERROR)
def on_new_intent(self, intent):
intent_action = intent.getAction()
action = None
@ -4250,6 +4315,63 @@ class SidebandApp(MDApp):
self.sideband.save_configuration()
def hardware_rnode_scan_job(self):
time.sleep(1.25)
added_devices = []
scan_timeout = time.time()+16
while time.time() < scan_timeout:
RNS.log("Scanning...", RNS.LOG_DEBUG)
for device_addr in self.discovered_bt_devices:
if device_addr not in added_devices and not device_addr in self.bt_bonded_devices:
new_device = self.discovered_bt_devices[device_addr]
added_devices.append(device_addr)
RNS.log(f"Adding device: {new_device}")
def add_factory(add_device):
def add_job(dt):
pair_addr = add_device["address"]
btn_text = "Pair "+add_device["name"]
def run_pair(sender): self.hardware_rnode_pair_device_action(pair_addr)
# device_button = MDRectangleFlatButton(text=btn_text,font_size=dp(16))
device_button = MDRectangleFlatButton(text=btn_text, font_size=dp(16), padding=[dp(0), dp(14), dp(0), dp(14)], size_hint=[1.0, None])
device_button.bind(on_release=run_pair)
self.hardware_rnode_screen.ids.rnode_scan_results.add_widget(device_button)
return add_job
Clock.schedule_once(add_factory(new_device), 0.1)
time.sleep(2)
def job(dt):
self.hardware_rnode_screen.ids.hardware_rnode_bt_scan_button.disabled = False
self.hardware_rnode_screen.ids.hardware_rnode_bt_scan_button.text = "Pair New Device"
Clock.schedule_once(job, 0.2)
if len(added_devices) == 0:
def job(dt): toast("No unpaired RNodes discovered")
Clock.schedule_once(job, 0.2)
def hardware_rnode_pair_device_action(self, pair_addr):
RNS.log(f"Pair action for {pair_addr}", RNS.LOG_DEBUG)
self.stop_bluetooth_scan()
BluetoothSocket = autoclass('android.bluetooth.BluetoothSocket')
if self.bt_adapter == None: self.bt_adapter = BluetoothAdapter.getDefaultAdapter()
addr_bytes = bytes.fromhex(pair_addr.replace(":", ""))
remote_device = self.bt_adapter.getRemoteDevice(addr_bytes)
RNS.log(f"Remote device: {remote_device}", RNS.LOG_DEBUG)
remote_device.createBond()
RNS.log("Create bond call returned", RNS.LOG_DEBUG)
def hardware_rnode_bt_scan_action(self, sender=None):
self.discovered_bt_devices = {}
self.hardware_rnode_screen.ids.hardware_rnode_bt_scan_button.disabled = True
self.hardware_rnode_screen.ids.hardware_rnode_bt_scan_button.text = "Scanning..."
rw = []
for child in self.hardware_rnode_screen.ids.rnode_scan_results.children: rw.append(child)
for w in rw: self.hardware_rnode_screen.ids.rnode_scan_results.remove_widget(w)
Clock.schedule_once(self.bluetooth_scan_action, 0.5)
threading.Thread(target=self.hardware_rnode_scan_job, daemon=True).start()
def hardware_rnode_bt_on_action(self, sender=None):
self.hardware_rnode_screen.ids.hardware_rnode_bt_pair_button.disabled = True
self.hardware_rnode_screen.ids.hardware_rnode_bt_on_button.disabled = True