From 16108c579deb17964f3603c7253454b711e9ccd0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 2 Mar 2021 14:05:01 +0000 Subject: [PATCH] Fix SQL delta file taking a long time to run (#9516) Fixes #9504 --- changelog.d/9516.bugfix | 1 + scripts/synapse_port_db | 2 + synapse/storage/databases/main/pusher.py | 53 +++++++++++++++++++ ...elete_pushers_for_deactivated_accounts.sql | 9 ++-- 4 files changed, 60 insertions(+), 5 deletions(-) create mode 100644 changelog.d/9516.bugfix diff --git a/changelog.d/9516.bugfix b/changelog.d/9516.bugfix new file mode 100644 index 000000000..81188c547 --- /dev/null +++ b/changelog.d/9516.bugfix @@ -0,0 +1 @@ +Fix a bug where users' pushers were not all deleted when they deactivated their account. diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index d2aaea08f..58edf6af6 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -47,6 +47,7 @@ from synapse.storage.databases.main.events_bg_updates import ( from synapse.storage.databases.main.media_repository import ( MediaRepositoryBackgroundUpdateStore, ) +from synapse.storage.databases.main.pusher import PusherWorkerStore from synapse.storage.databases.main.registration import ( RegistrationBackgroundUpdateStore, find_max_generated_user_id_localpart, @@ -177,6 +178,7 @@ class Store( UserDirectoryBackgroundUpdateStore, EndToEndKeyBackgroundStore, StatsStore, + PusherWorkerStore, ): def execute(self, f, *args, **kwargs): return self.db_pool.runInteraction(f.__name__, f, *args, **kwargs) diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py index 74219cb05..6b608ebc9 100644 --- a/synapse/storage/databases/main/pusher.py +++ b/synapse/storage/databases/main/pusher.py @@ -39,6 +39,11 @@ class PusherWorkerStore(SQLBaseStore): db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")] ) + self.db_pool.updates.register_background_update_handler( + "remove_deactivated_pushers", + self._remove_deactivated_pushers, + ) + def _decode_pushers_rows(self, rows: Iterable[dict]) -> Iterator[PusherConfig]: """JSON-decode the data in the rows returned from the `pushers` table @@ -284,6 +289,54 @@ class PusherWorkerStore(SQLBaseStore): lock=False, ) + async def _remove_deactivated_pushers(self, progress: dict, batch_size: int) -> int: + """A background update that deletes all pushers for deactivated users. + + Note that we don't proacively tell the pusherpool that we've deleted + these (just because its a bit off a faff to do from here), but they will + get cleaned up at the next restart + """ + + last_user = progress.get("last_user", "") + + def _delete_pushers(txn) -> int: + + sql = """ + SELECT name FROM users + WHERE deactivated = ? and name > ? + ORDER BY name ASC + LIMIT ? + """ + + txn.execute(sql, (1, last_user, batch_size)) + users = [row[0] for row in txn] + + self.db_pool.simple_delete_many_txn( + txn, + table="pushers", + column="user_name", + iterable=users, + keyvalues={}, + ) + + if users: + self.db_pool.updates._background_update_progress_txn( + txn, "remove_deactivated_pushers", {"last_user": users[-1]} + ) + + return len(users) + + number_deleted = await self.db_pool.runInteraction( + "_remove_deactivated_pushers", _delete_pushers + ) + + if number_deleted < batch_size: + await self.db_pool.updates._end_background_update( + "remove_deactivated_pushers" + ) + + return number_deleted + class PusherStore(PusherWorkerStore): def get_pushers_stream_token(self) -> int: diff --git a/synapse/storage/databases/main/schema/delta/59/08delete_pushers_for_deactivated_accounts.sql b/synapse/storage/databases/main/schema/delta/59/08delete_pushers_for_deactivated_accounts.sql index 20ba4abca..0ec676415 100644 --- a/synapse/storage/databases/main/schema/delta/59/08delete_pushers_for_deactivated_accounts.sql +++ b/synapse/storage/databases/main/schema/delta/59/08delete_pushers_for_deactivated_accounts.sql @@ -14,8 +14,7 @@ */ --- We may not have deleted all pushers for deactivated accounts. Do so now. --- --- Note: We don't bother updating the `deleted_pushers` table as it's just use --- to stop pushers on workers, and that will happen when they get next restarted. -DELETE FROM pushers WHERE user_name IN (SELECT name FROM users WHERE deactivated = 1); +-- We may not have deleted all pushers for deactivated accounts, so we set up a +-- background job to delete them. +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (5908, 'remove_deactivated_pushers', '{}');