Implemented background telemetry

This commit is contained in:
Mark Qvist 2023-12-03 02:08:04 +01:00
parent ecf13e0041
commit 19cacd0e86
5 changed files with 179 additions and 42 deletions

View file

@ -30,8 +30,7 @@ if RNS.vendor.platformutils.get_platform() == "android":
from android import python_act
android_api_version = autoclass('android.os.Build$VERSION').SDK_INT
Context = autoclass('android.content.Context')
Intent = autoclass('android.content.Intent')
BitmapFactory = autoclass('android.graphics.BitmapFactory')
Icon = autoclass("android.graphics.drawable.Icon")
@ -60,6 +59,7 @@ class SidebandService():
0x0483: [0x5740], # ST CDC
0x2E8A: [0x0005, 0x000A], # Raspberry Pi Pico
}
def android_notification(self, title="", content="", ticker="", group=None, context_id=None):
if android_api_version < 26:
return
@ -121,12 +121,78 @@ class SidebandService():
built_notification = notification.build()
self.notification_service.notify(0, built_notification)
def check_permission(self, permission):
if RNS.vendor.platformutils.is_android():
try:
result = self.android_service.checkSelfPermission("android.permission."+permission)
if result == 0:
return True
except Exception as e:
RNS.log("Error while checking permission: "+str(e), RNS.LOG_ERROR)
return False
else:
return False
def background_location_allowed(self):
if not RNS.vendor.platformutils.is_android():
return False
perms = ["ACCESS_FINE_LOCATION","ACCESS_COARSE_LOCATION","ACCESS_BACKGROUND_LOCATION"]
for perm in perms:
if not self.check_permission(perm):
return False
return True
def android_location_callback(self, **kwargs):
self._raw_gps = kwargs
self._last_gps_update = time.time()
def should_update_location(self):
if self.sideband.config["telemetry_enabled"] and self.sideband.config["telemetry_s_location"] and self.background_location_allowed():
return True
else:
return False
def update_location_provider(self):
if RNS.vendor.platformutils.is_android():
if self.should_update_location():
if not self._gps_started:
RNS.log("Starting service location provider", RNS.LOG_DEBUG)
if self.gps == None:
from plyer import gps
self.gps = gps
self.gps.configure(on_location=self.android_location_callback)
self.gps.start(minTime=self._gps_stale_time, minDistance=self._gps_min_distance)
self._gps_started = True
else:
if self._gps_started:
RNS.log("Stopping service location provider", RNS.LOG_DEBUG)
if self.gps != None:
self.gps.stop()
self._gps_started = False
self._raw_gps = None
def get_location(self):
return self._last_gps_update, self._raw_gps
def __init__(self):
self.argument = environ.get('PYTHON_SERVICE_ARGUMENT', '')
self.app_dir = self.argument
self.multicast_lock = None
self.wake_lock = None
self.should_run = False
self.gps = None
self._gps_started = False
self._gps_stale_time = 299
self._gps_min_distance = 25
self._raw_gps = None
self.android_service = None
self.app_context = None
@ -149,9 +215,7 @@ class SidebandService():
self.discover_usb_devices()
self.sideband = SidebandCore(self, is_service=True, android_app_dir=self.app_dir, verbose=__debug_build__)
self.sideband.service_context = self.android_service
self.sideband.owner_service = self
self.sideband = SidebandCore(self, is_service=True, android_app_dir=self.app_dir, verbose=__debug_build__, owner_service=self, service_context=self.android_service)
if self.sideband.config["debug"]:
Logger.setLevel(LOG_LEVELS["debug"])
@ -161,6 +225,7 @@ class SidebandService():
if RNS.vendor.platformutils.is_android():
RNS.log("Discovered USB devices: "+str(self.usb_devices), RNS.LOG_EXTREME)
self.update_location_provider()
def discover_usb_devices(self):