From a0a6b0fd552d807cd17a8817c02ff0810d805a69 Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Thu, 27 Mar 2025 22:16:54 +0100 Subject: [PATCH] Handle MQTT client memory leak --- sbapp/sideband/core.py | 38 +++++++++++++++++++++++++++++++++----- sbapp/sideband/mqtt.py | 3 +++ 2 files changed, 36 insertions(+), 5 deletions(-) diff --git a/sbapp/sideband/core.py b/sbapp/sideband/core.py index 2491e02..161854e 100644 --- a/sbapp/sideband/core.py +++ b/sbapp/sideband/core.py @@ -8,6 +8,7 @@ import sqlite3 import random import shlex import re +import gc import RNS.vendor.umsgpack as msgpack import RNS.Interfaces.Interface as Interface @@ -268,6 +269,7 @@ class SidebandCore(): self.webshare_ssl_cert_path = self.app_dir+"/app_storage/ssl_cert.pem" self.mqtt = None + self.mqtt_handle_lock = threading.Lock() self.first_run = True self.saving_configuration = False @@ -2141,6 +2143,7 @@ class SidebandCore(): dbc = db.cursor() dbc.execute("CREATE TABLE IF NOT EXISTS telemetry (id INTEGER PRIMARY KEY, dest_context BLOB, ts INTEGER, data BLOB)") + dbc.execute("CREATE INDEX IF NOT EXISTS idx_telemetry_ts ON telemetry(ts)") db.commit() def _db_upgradetables(self): @@ -3269,12 +3272,37 @@ class SidebandCore(): self.setstate("app.flags.last_telemetry", time.time()) def mqtt_handle_telemetry(self, context_dest, telemetry): - if self.mqtt == None: - self.mqtt = MQTT() + with self.mqtt_handle_lock: + # TODO: Remove debug + if hasattr(self, "last_mqtt_recycle") and time.time() > self.last_mqtt_recycle + 60*4: + # RNS.log("Recycling MQTT handler", RNS.LOG_DEBUG) + self.mqtt.stop() + self.mqtt.client = None + self.mqtt = None + gc.collect() - self.mqtt.set_server(self.config["telemetry_mqtt_host"], self.config["telemetry_mqtt_port"]) - self.mqtt.set_auth(self.config["telemetry_mqtt_user"], self.config["telemetry_mqtt_pass"]) - self.mqtt.handle(context_dest, telemetry) + if self.mqtt == None: + self.mqtt = MQTT() + self.last_mqtt_recycle = time.time() + + self.mqtt.set_server(self.config["telemetry_mqtt_host"], self.config["telemetry_mqtt_port"]) + self.mqtt.set_auth(self.config["telemetry_mqtt_user"], self.config["telemetry_mqtt_pass"]) + self.mqtt.handle(context_dest, telemetry) + + # TODO: Remove debug + # if not hasattr(self, "memtr"): + # from pympler import muppy + # from pympler import summary + # import resource + # self.res = resource + # self.ms = summary; self.mp = muppy + # self.memtr = self.ms.summarize(self.mp.get_objects()) + # RNS.log(f"RSS: {RNS.prettysize(self.res.getrusage(self.res.RUSAGE_SELF).ru_maxrss*1000)}") + # else: + # memsum = self.ms.summarize(self.mp.get_objects()) + # memdiff = self.ms.get_diff(self.memtr, memsum) + # self.ms.print_(memdiff) + # RNS.log(f"RSS: {RNS.prettysize(self.res.getrusage(self.res.RUSAGE_SELF).ru_maxrss*1000)}") def update_telemetry(self): try: diff --git a/sbapp/sideband/mqtt.py b/sbapp/sideband/mqtt.py index bed0e6d..df55388 100644 --- a/sbapp/sideband/mqtt.py +++ b/sbapp/sideband/mqtt.py @@ -49,6 +49,9 @@ class MQTT(): time.sleep(MQTT.SCHEDULER_SLEEP) + try: self.disconnect() + except Exception as e: RNS.log(f"An error occurred while disconnecting MQTT server: {e}", RNS.LOG_ERROR) + RNS.log("Stopped MQTT scheduler", RNS.LOG_DEBUG) def connect_failed(self, client, userdata):