Added voice call service base

This commit is contained in:
Mark Qvist 2025-03-09 14:29:02 +01:00
parent 902e1c5451
commit a0a03c9eba
5 changed files with 380 additions and 3 deletions

View File

@ -239,6 +239,7 @@ else:
from ui.conversations import Conversations, MsgSync, NewConv
from ui.telemetry import Telemetry
from ui.utilities import Utilities
from ui.voice import Voice
from ui.objectdetails import ObjectDetails
from ui.announces import Announces
from ui.messages import Messages, ts_format, messages_screen_kv
@ -267,6 +268,7 @@ else:
from .ui.announces import Announces
from .ui.telemetry import Telemetry
from .ui.utilities import Utilities
from .ui.voice import Voice
from .ui.objectdetails import ObjectDetails
from .ui.messages import Messages, ts_format, messages_screen_kv
from .ui.helpers import ContentNavigationDrawer, DrawerList, IconListItem
@ -352,6 +354,7 @@ class SidebandApp(MDApp):
self.settings_ready = False
self.telemetry_ready = False
self.utilities_ready = False
self.voice_ready = False
self.connectivity_ready = False
self.hardware_ready = False
self.repository_ready = False
@ -3148,6 +3151,15 @@ class SidebandApp(MDApp):
self.sideband.config["hq_ptt"] = self.settings_screen.ids.settings_hq_ptt.active
self.sideband.save_configuration()
def save_voice_enabled(sender=None, event=None):
self.sideband.config["voice_enabled"] = self.settings_screen.ids.settings_voice_enabled.active
self.sideband.save_configuration()
if self.sideband.config["voice_enabled"] == True:
self.sideband.start_voice()
else:
self.sideband.stop_voice()
def save_print_command(sender=None, event=None):
if not sender.focus:
in_cmd = self.settings_screen.ids.settings_print_command.text
@ -3323,6 +3335,9 @@ class SidebandApp(MDApp):
self.settings_screen.ids.settings_hq_ptt.active = self.sideband.config["hq_ptt"]
self.settings_screen.ids.settings_hq_ptt.bind(active=save_hq_ptt)
self.settings_screen.ids.settings_voice_enabled.active = self.sideband.config["voice_enabled"]
self.settings_screen.ids.settings_voice_enabled.bind(active=save_voice_enabled)
self.settings_screen.ids.settings_debug.active = self.sideband.config["debug"]
self.settings_screen.ids.settings_debug.bind(active=save_debug)
@ -5234,6 +5249,44 @@ class SidebandApp(MDApp):
self.utilities_action(direction="right")
### voice Screen
######################################
def voice_init(self):
if not self.voice_ready:
self.voice_screen = Voice(self)
self.voice_ready = True
def voice_open(self, sender=None, direction="left", no_transition=False):
if no_transition:
self.root.ids.screen_manager.transition = self.no_transition
else:
self.root.ids.screen_manager.transition = self.slide_transition
self.root.ids.screen_manager.transition.direction = direction
self.root.ids.screen_manager.current = "voice_screen"
self.root.ids.nav_drawer.set_state("closed")
self.sideband.setstate("app.displaying", self.root.ids.screen_manager.current)
if no_transition:
self.root.ids.screen_manager.transition = self.slide_transition
def voice_action(self, sender=None, direction="left"):
if self.voice_ready:
self.voice_open(direction=direction)
else:
self.loader_action(direction=direction)
def final(dt):
self.voice_init()
def o(dt):
self.voice_open(no_transition=True)
Clock.schedule_once(o, ll_ot)
Clock.schedule_once(final, ll_ft)
def close_sub_voice_action(self, sender=None):
self.voice_action(direction="right")
### Telemetry Screen
######################################

View File

@ -167,6 +167,7 @@ class SidebandCore():
self.owner_app = owner_app
self.reticulum = None
self.webshare_server = None
self.voice_running = False
self.telemeter = None
self.telemetry_running = False
self.latest_telemetry = None
@ -531,6 +532,9 @@ class SidebandCore():
self.config["telemetry_send_to_trusted"] = False
self.config["telemetry_send_to_collector"] = False
# Voice
self.config["voice_enabled"] = False
if not os.path.isfile(self.db_path):
self.__db_init()
else:
@ -837,6 +841,9 @@ class SidebandCore():
if not "map_storage_file" in self.config:
self.config["map_storage_file"] = None
if not "voice_enabled" in self.config:
self.config["voice_enabled"] = False
# Make sure we have a database
if not os.path.isfile(self.db_path):
self.__db_init()
@ -3715,8 +3722,8 @@ class SidebandCore():
self.periodic_thread.start()
if self.is_standalone or self.is_client:
if self.config["telemetry_enabled"]:
self.run_telemetry()
if self.config["telemetry_enabled"]: self.run_telemetry()
if self.config["voice_enabled"]: self.start_voice()
elif self.is_service:
self.run_service_telemetry()
@ -5183,6 +5190,36 @@ class SidebandCore():
if not self.reticulum.is_connected_to_shared_instance:
RNS.Transport.detach_interfaces()
def start_voice(self):
try:
if not self.voice_running:
RNS.log("Starting voice service", RNS.LOG_DEBUG)
self.voice_running = True
from .voice import ReticulumTelephone
self.telephone = ReticulumTelephone(self.identity)
ringtone_path = os.path.join(self.asset_dir, "audio", "notifications", "soft1.opus")
self.telephone.set_ringtone(ringtone_path)
except Exception as e:
self.voice_running = False
RNS.log(f"An error occurred while starting voice services, the contained exception was: {e}", RNS.LOG_ERROR)
RNS.trace_exception(e)
def stop_voice(self):
try:
if self.voice_running:
RNS.log("Stopping voice service", RNS.LOG_DEBUG)
if self.telephone:
self.telephone.stop()
del self.telephone
self.telephone = None
self.voice_running = False
except Exception as e:
RNS.log(f"An error occurred while stopping voice services, the contained exception was: {e}", RNS.LOG_ERROR)
RNS.trace_exception(e)
rns_config = """# This template is used to generate a
# running configuration for Sideband's
# internal RNS instance. Incorrect changes

153
sbapp/sideband/voice.py Normal file
View File

@ -0,0 +1,153 @@
import RNS
import os
import sys
import time
from LXST._version import __version__
from LXST.Primitives.Telephony import Telephone
from RNS.vendor.configobj import ConfigObj
class ReticulumTelephone():
STATE_AVAILABLE = 0x00
STATE_CONNECTING = 0x01
STATE_RINGING = 0x02
STATE_IN_CALL = 0x03
HW_SLEEP_TIMEOUT = 15
HW_STATE_IDLE = 0x00
HW_STATE_DIAL = 0x01
HW_STATE_SLEEP = 0xFF
RING_TIME = 30
WAIT_TIME = 60
PATH_TIME = 10
def __init__(self, identity, verbosity = 0, service = False):
self.identity = identity
self.service = service
self.config = None
self.should_run = False
self.telephone = None
self.state = self.STATE_AVAILABLE
self.hw_state = self.HW_STATE_IDLE
self.hw_last_event = time.time()
self.hw_input = ""
self.direction = None
self.last_input = None
self.first_run = False
self.ringtone_path = None
self.speaker_device = None
self.microphone_device = None
self.ringer_device = None
self.phonebook = {}
self.aliases = {}
self.names = {}
self.telephone = Telephone(self.identity, ring_time=self.RING_TIME, wait_time=self.WAIT_TIME)
self.telephone.set_ringing_callback(self.ringing)
self.telephone.set_established_callback(self.call_established)
self.telephone.set_ended_callback(self.call_ended)
self.telephone.set_speaker(self.speaker_device)
self.telephone.set_microphone(self.microphone_device)
self.telephone.set_ringer(self.ringer_device)
RNS.log(f"{self} initialised", RNS.LOG_DEBUG)
def set_ringtone(self, ringtone_path):
if os.path.isfile(ringtone_path):
self.ringtone_path = ringtone_path
self.telephone.set_ringtone(self.ringtone_path)
@property
def is_available(self):
return self.state == self.STATE_AVAILABLE
@property
def is_in_call(self):
return self.state == self.STATE_IN_CALL
@property
def is_ringing(self):
return self.state == self.STATE_RINGING
@property
def call_is_connecting(self):
return self.state == self.STATE_CONNECTING
@property
def hw_is_idle(self):
return self.hw_state == self.HW_STATE_IDLE
@property
def hw_is_dialing(self):
return self.hw_state == self.HW_STATE_DIAL
def start(self):
if not self.should_run:
self.telephone.announce()
self.should_run = True
self.run()
def stop(self):
self.should_run = False
self.telephone.teardown()
self.telephone = None
def dial(self, identity_hash):
self.last_dialled_identity_hash = identity_hash
self.telephone.set_busy(True)
identity_hash = bytes.fromhex(identity_hash)
destination_hash = RNS.Destination.hash_from_name_and_identity("lxst.telephony", identity_hash)
if not RNS.Transport.has_path(destination_hash):
RNS.Transport.request_path(destination_hash)
def spincheck(): return RNS.Transport.has_path(destination_hash)
self.__spin(spincheck, "Requesting path for call to "+RNS.prettyhexrep(identity_hash), self.path_time)
if not spincheck(): RNS.log("Path request timed out", RNS.LOG_DEBUG)
self.telephone.set_busy(False)
if RNS.Transport.has_path(destination_hash):
call_hops = RNS.Transport.hops_to(destination_hash)
cs = "" if call_hops == 1 else "s"
RNS.log(f"Connecting call over {call_hops} hop{cs}...", RNS.LOG_DEBUG)
identity = RNS.Identity.recall(destination_hash)
self.call(identity)
else:
pass
def redial(self, args=None):
if self.last_dialled_identity_hash: self.dial(self.last_dialled_identity_hash)
def call(self, remote_identity):
RNS.log(f"Calling {RNS.prettyhexrep(remote_identity.hash)}...", RNS.LOG_DEBUG)
self.state = self.STATE_CONNECTING
self.caller = remote_identity
self.direction = "to"
self.telephone.call(self.caller)
def ringing(self, remote_identity):
if self.hw_state == self.HW_STATE_SLEEP: self.hw_state = self.HW_STATE_IDLE
self.state = self.STATE_RINGING
self.caller = remote_identity
self.direction = "from" if self.direction == None else "to"
RNS.log(f"Incoming call from {RNS.prettyhexrep(self.caller.hash)}", RNS.LOG_DEBUG)
def call_ended(self, remote_identity):
if self.is_in_call or self.is_ringing or self.call_is_connecting:
if self.is_in_call: RNS.log(f"Call with {RNS.prettyhexrep(self.caller.hash)} ended\n", RNS.LOG_DEBUG)
if self.is_ringing: RNS.log(f"Call {self.direction} {RNS.prettyhexrep(self.caller.hash)} was not answered\n", RNS.LOG_DEBUG)
if self.call_is_connecting: RNS.log(f"Call to {RNS.prettyhexrep(self.caller.hash)} could not be connected\n", RNS.LOG_DEBUG)
self.direction = None
self.state = self.STATE_AVAILABLE
def call_established(self, remote_identity):
if self.call_is_connecting or self.is_ringing:
self.state = self.STATE_IN_CALL
RNS.log(f"Call established with {RNS.prettyhexrep(self.caller.hash)}", RNS.LOG_DEBUG)
def __spin(self, until=None, msg=None, timeout=None):
if msg: RNS.log(msg, RNS.LOG_DEBUG)
if timeout != None: timeout = time.time()+timeout
while (timeout == None or time.time()<timeout) and not until(): time.sleep(0.1)
if timeout != None and time.time() > timeout:
return False
else:
return True

View File

@ -96,6 +96,16 @@ MDNavigationLayout:
IconLeftWidget:
icon: "account-voice"
on_release: root.ids.screen_manager.app.announces_action(self)
OneLineIconListItem:
text: "Voice"
on_release: root.ids.screen_manager.app.voice_action(self)
# _no_ripple_effect: True
IconLeftWidget:
icon: "phone-in-talk"
on_release: root.ids.screen_manager.app.voice_action(self)
# OneLineIconListItem:
@ -1790,7 +1800,7 @@ MDScreen:
height: dp(48)
MDLabel:
text: "Use high-quality voice for PTT"
text: "High-quality codec for LXMF PTT"
font_style: "H6"
MDSwitch:
@ -1799,6 +1809,22 @@ MDScreen:
disabled: False
active: False
MDBoxLayout:
orientation: "horizontal"
size_hint_y: None
padding: [0,0,dp(24),dp(0)]
height: dp(48)
MDLabel:
text: "Enable voice calls"
font_style: "H6"
MDSwitch:
id: settings_voice_enabled
pos_hint: {"center_y": 0.3}
disabled: False
active: False
# MDBoxLayout:
# orientation: "horizontal"
# size_hint_y: None

108
sbapp/ui/voice.py Normal file
View File

@ -0,0 +1,108 @@
import time
import RNS
from typing import Union
from kivy.metrics import dp,sp
from kivy.lang.builder import Builder
from kivy.core.clipboard import Clipboard
from kivy.utils import escape_markup
from kivymd.uix.recycleview import MDRecycleView
from kivymd.uix.list import OneLineIconListItem
from kivymd.uix.pickers import MDColorPicker
from kivymd.uix.button import MDRectangleFlatButton
from kivymd.uix.dialog import MDDialog
from kivymd.icon_definitions import md_icons
from kivymd.toast import toast
from kivy.properties import StringProperty, BooleanProperty
from kivy.effects.scroll import ScrollEffect
from kivy.clock import Clock
from sideband.sense import Telemeter
import threading
from datetime import datetime
if RNS.vendor.platformutils.get_platform() == "android":
from ui.helpers import ts_format
from android.permissions import request_permissions, check_permission
else:
from .helpers import ts_format
class Voice():
def __init__(self, app):
self.app = app
self.screen = None
self.rnstatus_screen = None
self.rnstatus_instance = None
self.logviewer_screen = None
if not self.app.root.ids.screen_manager.has_screen("voice_screen"):
self.screen = Builder.load_string(layout_voice_screen)
self.screen.app = self.app
self.screen.delegate = self
self.app.root.ids.screen_manager.add_widget(self.screen)
self.screen.ids.voice_scrollview.effect_cls = ScrollEffect
info = "Voice services UI"
info += ""
if self.app.theme_cls.theme_style == "Dark":
info = "[color=#"+self.app.dark_theme_text_color+"]"+info+"[/color]"
self.screen.ids.voice_info.text = info
layout_voice_screen = """
MDScreen:
name: "voice_screen"
BoxLayout:
orientation: "vertical"
MDTopAppBar:
title: "Voice"
anchor_title: "left"
elevation: 0
left_action_items:
[['menu', lambda x: root.app.nav_drawer.set_state("open")]]
right_action_items:
[
['close', lambda x: root.app.close_any_action(self)],
]
ScrollView:
id: voice_scrollview
MDBoxLayout:
orientation: "vertical"
size_hint_y: None
height: self.minimum_height
padding: [dp(28), dp(32), dp(28), dp(16)]
# MDLabel:
# text: "Utilities & Tools"
# font_style: "H6"
MDLabel:
id: voice_info
markup: True
text: ""
size_hint_y: None
text_size: self.width, None
height: self.texture_size[1]
MDBoxLayout:
orientation: "vertical"
spacing: "24dp"
size_hint_y: None
height: self.minimum_height
padding: [dp(0), dp(35), dp(0), dp(35)]
MDRectangleFlatIconButton:
id: rnstatus_button
icon: "wifi-check"
text: "Reticulum Status"
padding: [dp(0), dp(14), dp(0), dp(14)]
icon_size: dp(24)
font_size: dp(16)
size_hint: [1.0, None]
on_release: root.delegate.rnstatus_action(self)
disabled: False
"""