From 57ad702af0511aff36ca69fb2e9fc3399cce3a8d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 30 Jan 2020 17:17:44 +0000 Subject: [PATCH] Backgroud update to clean out rooms from current state (#6802) --- changelog.d/6802.misc | 1 + .../57/delete_old_current_state_events.sql | 19 +++ synapse/storage/data_stores/main/state.py | 108 +++++++++++++++++- 3 files changed, 126 insertions(+), 2 deletions(-) create mode 100644 changelog.d/6802.misc create mode 100644 synapse/storage/data_stores/main/schema/delta/57/delete_old_current_state_events.sql diff --git a/changelog.d/6802.misc b/changelog.d/6802.misc new file mode 100644 index 000000000..a77ba1d7a --- /dev/null +++ b/changelog.d/6802.misc @@ -0,0 +1 @@ +Add background update to clean out left rooms from current state. diff --git a/synapse/storage/data_stores/main/schema/delta/57/delete_old_current_state_events.sql b/synapse/storage/data_stores/main/schema/delta/57/delete_old_current_state_events.sql new file mode 100644 index 000000000..a133d87a1 --- /dev/null +++ b/synapse/storage/data_stores/main/schema/delta/57/delete_old_current_state_events.sql @@ -0,0 +1,19 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Add background update to go and delete current state events for rooms the +-- server is no longer in. +INSERT into background_updates (update_name, progress_json) + VALUES ('delete_old_current_state_events', '{}'); diff --git a/synapse/storage/data_stores/main/state.py b/synapse/storage/data_stores/main/state.py index bd7b0276f..9b6f68e77 100644 --- a/synapse/storage/data_stores/main/state.py +++ b/synapse/storage/data_stores/main/state.py @@ -21,12 +21,13 @@ from six import iteritems from twisted.internet import defer -from synapse.api.constants import EventTypes +from synapse.api.constants import EventTypes, Membership from synapse.api.errors import NotFoundError from synapse.events import EventBase from synapse.events.snapshot import EventContext from synapse.storage._base import SQLBaseStore from synapse.storage.data_stores.main.events_worker import EventsWorkerStore +from synapse.storage.data_stores.main.roommember import RoomMemberWorkerStore from synapse.storage.database import Database from synapse.storage.state import StateFilter from synapse.util.caches import intern_string @@ -300,14 +301,17 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): return set(row["state_group"] for row in rows) -class MainStateBackgroundUpdateStore(SQLBaseStore): +class MainStateBackgroundUpdateStore(RoomMemberWorkerStore): CURRENT_STATE_INDEX_UPDATE_NAME = "current_state_members_idx" EVENT_STATE_GROUP_INDEX_UPDATE_NAME = "event_to_state_groups_sg_index" + DELETE_CURRENT_STATE_UPDATE_NAME = "delete_old_current_state_events" def __init__(self, database: Database, db_conn, hs): super(MainStateBackgroundUpdateStore, self).__init__(database, db_conn, hs) + self.server_name = hs.hostname + self.db.updates.register_background_index_update( self.CURRENT_STATE_INDEX_UPDATE_NAME, index_name="current_state_events_member_index", @@ -321,6 +325,106 @@ class MainStateBackgroundUpdateStore(SQLBaseStore): table="event_to_state_groups", columns=["state_group"], ) + self.db.updates.register_background_update_handler( + self.DELETE_CURRENT_STATE_UPDATE_NAME, self._background_remove_left_rooms, + ) + + async def _background_remove_left_rooms(self, progress, batch_size): + """Background update to delete rows from `current_state_events` and + `event_forward_extremities` tables of rooms that the server is no + longer joined to. + """ + + last_room_id = progress.get("last_room_id", "") + + def _background_remove_left_rooms_txn(txn): + sql = """ + SELECT DISTINCT room_id FROM current_state_events + WHERE room_id > ? ORDER BY room_id LIMIT ? + """ + + txn.execute(sql, (last_room_id, batch_size)) + room_ids = list(row[0] for row in txn) + if not room_ids: + return True, set() + + sql = """ + SELECT room_id + FROM current_state_events + WHERE + room_id > ? AND room_id <= ? + AND type = 'm.room.member' + AND membership = 'join' + AND state_key LIKE ? + GROUP BY room_id + """ + + txn.execute(sql, (last_room_id, room_ids[-1], "%:" + self.server_name)) + + joined_room_ids = set(row[0] for row in txn) + + left_rooms = set(room_ids) - joined_room_ids + + # First we get all users that we still think were joined to the + # room. This is so that we can mark those device lists as + # potentially stale, since there may have been a period where the + # server didn't share a room with the remote user and therefore may + # have missed any device updates. + rows = self.db.simple_select_many_txn( + txn, + table="current_state_events", + column="room_id", + iterable=left_rooms, + keyvalues={"type": EventTypes.Member, "membership": Membership.JOIN}, + retcols=("state_key",), + ) + + potentially_left_users = set(row["state_key"] for row in rows) + + # Now lets actually delete the rooms from the DB. + self.db.simple_delete_many_txn( + txn, + table="current_state_events", + column="room_id", + iterable=left_rooms, + keyvalues={}, + ) + + self.db.simple_delete_many_txn( + txn, + table="event_forward_extremities", + column="room_id", + iterable=left_rooms, + keyvalues={}, + ) + + self.db.updates._background_update_progress_txn( + txn, + self.DELETE_CURRENT_STATE_UPDATE_NAME, + {"last_room_id": room_ids[-1]}, + ) + + return False, potentially_left_users + + finished, potentially_left_users = await self.db.runInteraction( + "_background_remove_left_rooms", _background_remove_left_rooms_txn + ) + + if finished: + await self.db.updates._end_background_update( + self.DELETE_CURRENT_STATE_UPDATE_NAME + ) + + # Now go and check if we still share a room with the remote users in + # the deleted rooms. If not mark their device lists as stale. + joined_users = await self.get_users_server_still_shares_room_with( + potentially_left_users + ) + + for user_id in potentially_left_users - joined_users: + await self.mark_remote_user_device_list_as_unsubscribed(user_id) + + return batch_size class StateStore(StateGroupWorkerStore, MainStateBackgroundUpdateStore):