mirror of
https://github.com/markqvist/Sideband.git
synced 2025-08-03 20:14:24 -04:00
Added periodic telemetry data cleaning
This commit is contained in:
parent
3b2e1adaf2
commit
b4a063a4e7
3 changed files with 28 additions and 131 deletions
|
@ -2809,7 +2809,7 @@ class SidebandApp(MDApp):
|
|||
|
||||
str_comps = " - [b]Reticulum[/b] (MIT License)\n - [b]LXMF[/b] (MIT License)\n - [b]KivyMD[/b] (MIT License)"
|
||||
str_comps += "\n - [b]Kivy[/b] (MIT License)\n - [b]Codec2[/b] (LGPL License)\n - [b]PyCodec2[/b] (BSD-3 License)"
|
||||
str_comps += "\n - [b]PyDub[/b] (MIT License)\n - [b]PyOgg[/b] (Public Domain)\n - [b]MD2bbcode[/b] (GPL3 License)"
|
||||
str_comps += "\n - [b]PyDub[/b] (MIT License)\n - [b]PyOgg[/b] (Public Domain)\n - [b]FFmpeg[/b] (GPL3 License)\n - [b]MD2bbcode[/b] (GPL3 License)"
|
||||
str_comps += "\n - [b]GeoidHeight[/b] (LGPL License)\n - [b]Paho MQTT[/b] (EPL2 License)\n - [b]Python[/b] (PSF License)"
|
||||
str_comps += "\n\nGo to [u][ref=link]https://unsigned.io/donate[/ref][/u] to support the project.\n\nThe Sideband app is Copyright © 2025 Mark Qvist / unsigned.io\n\nPermission is granted to freely share and distribute binary copies of "+self.root.ids.app_version_info.text+", so long as no payment or compensation is charged for said distribution or sharing.\n\nIf you were charged or paid anything for this copy of Sideband, please report it to [b]license@unsigned.io[/b].\n\nTHIS IS EXPERIMENTAL SOFTWARE - SIDEBAND COMES WITH ABSOLUTELY NO WARRANTY - USE AT YOUR OWN RISK AND RESPONSIBILITY"
|
||||
info = "This is "+self.root.ids.app_version_info.text+", on RNS v"+RNS.__version__+" and LXMF v"+LXMF.__version__+".\n\nHumbly build using the following open components:\n\n"+str_comps
|
||||
|
|
|
@ -113,8 +113,10 @@ class SidebandCore():
|
|||
SERVICE_JOB_INTERVAL = 1
|
||||
PERIODIC_JOBS_INTERVAL = 60
|
||||
PERIODIC_SYNC_RETRY = 360
|
||||
TELEMETRY_KEEP = 60*60*24*7
|
||||
TELEMETRY_INTERVAL = 60
|
||||
SERVICE_TELEMETRY_INTERVAL = 300
|
||||
TELEMETRY_CLEAN_INTERVAL = 3600
|
||||
|
||||
IF_CHANGE_ANNOUNCE_MIN_INTERVAL = 3.5 # In seconds
|
||||
AUTO_ANNOUNCE_RANDOM_MIN = 90 # In minutes
|
||||
|
@ -174,6 +176,8 @@ class SidebandCore():
|
|||
self.pending_telemetry_send_try = 0
|
||||
self.pending_telemetry_send_maxtries = 2
|
||||
self.telemetry_send_blocked_until = 0
|
||||
self.telemetry_clean_interval = self.TELEMETRY_CLEAN_INTERVAL
|
||||
self.last_telemetry_clean = 0
|
||||
self.pending_telemetry_request = False
|
||||
self.telemetry_request_max_history = 7*24*60*60
|
||||
self.live_tracked_objects = {}
|
||||
|
@ -2717,7 +2721,7 @@ class SidebandCore():
|
|||
db.commit()
|
||||
|
||||
def _db_clean_messages(self):
|
||||
RNS.log("Purging stale messages... "+str(self.db_path))
|
||||
RNS.log("Purging stale messages... ", RNS.LOG_DEBUG)
|
||||
with self.db_lock:
|
||||
db = self.__db_connect()
|
||||
dbc = db.cursor()
|
||||
|
@ -2726,6 +2730,20 @@ class SidebandCore():
|
|||
dbc.execute(query, {"outbound_state": LXMF.LXMessage.OUTBOUND, "sending_state": LXMF.LXMessage.SENDING})
|
||||
db.commit()
|
||||
|
||||
def _db_clean_telemetry(self):
|
||||
RNS.log("Cleaning telemetry... ", RNS.LOG_DEBUG)
|
||||
clean_time = time.time()-self.TELEMETRY_KEEP
|
||||
with self.db_lock:
|
||||
db = self.__db_connect()
|
||||
dbc = db.cursor()
|
||||
|
||||
query = f"delete from telemetry where (ts < {clean_time});"
|
||||
dbc.execute(query, {"outbound_state": LXMF.LXMessage.OUTBOUND, "sending_state": LXMF.LXMessage.SENDING})
|
||||
db.commit()
|
||||
|
||||
self.last_telemetry_clean = time.time()
|
||||
|
||||
|
||||
def _db_message_set_state(self, lxm_hash, state, is_retry=False, ratchet_id=None, originator_stamp=None):
|
||||
msg_extras = None
|
||||
if ratchet_id != None:
|
||||
|
@ -3525,6 +3543,9 @@ class SidebandCore():
|
|||
self.setpersistent("lxmf.syncretrying", False)
|
||||
|
||||
if self.config["telemetry_enabled"]:
|
||||
if time.time()-self.last_telemetry_clean > self.telemetry_clean_interval:
|
||||
self._db_clean_telemetry()
|
||||
|
||||
if self.config["telemetry_send_to_collector"]:
|
||||
if self.config["telemetry_collector"] != None and self.config["telemetry_collector"] != self.lxmf_destination.hash:
|
||||
try:
|
||||
|
@ -4783,6 +4804,7 @@ class SidebandCore():
|
|||
|
||||
def start(self):
|
||||
self._db_clean_messages()
|
||||
self._db_clean_telemetry()
|
||||
self.__start_jobs_immediate()
|
||||
|
||||
thread = threading.Thread(target=self.__start_jobs_deferred)
|
||||
|
@ -5008,6 +5030,10 @@ class SidebandCore():
|
|||
RNS.log("Error while handling commands: "+str(e), RNS.LOG_ERROR)
|
||||
|
||||
def create_telemetry_collector_response(self, to_addr, timebase, is_authorized_telemetry_request=False):
|
||||
if self.getstate(f"telemetry.{RNS.hexrep(to_addr, delimit=False)}.update_sending") == True:
|
||||
RNS.log("Not sending new telemetry collector response, since an earlier transfer is already in progress", RNS.LOG_DEBUG)
|
||||
return "in_progress"
|
||||
|
||||
added_sources = {}
|
||||
sources = self.list_telemetry(after=timebase)
|
||||
only_latest = self.config["telemetry_requests_only_send_latest"]
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue