Servlet to purge old rooms (#5845)

This commit is contained in:
Richard van der Hoff 2019-08-22 10:42:59 +01:00 committed by GitHub
parent ef1c524bb3
commit 119aa31b10
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 232 additions and 0 deletions

1
changelog.d/5845.feature Normal file
View File

@ -0,0 +1 @@
Add an admin API to purge old rooms from the database.

View File

@ -0,0 +1,18 @@
Purge room API
==============
This API will remove all trace of a room from your database.
All local users must have left the room before it can be removed.
The API is:
```
POST /_synapse/admin/v1/purge_room
{
"room_id": "!room:id"
}
```
You must authenticate using the access token of an admin user.

View File

@ -70,6 +70,7 @@ class PaginationHandler(object):
self.auth = hs.get_auth() self.auth = hs.get_auth()
self.store = hs.get_datastore() self.store = hs.get_datastore()
self.clock = hs.get_clock() self.clock = hs.get_clock()
self._server_name = hs.hostname
self.pagination_lock = ReadWriteLock() self.pagination_lock = ReadWriteLock()
self._purges_in_progress_by_room = set() self._purges_in_progress_by_room = set()
@ -153,6 +154,22 @@ class PaginationHandler(object):
""" """
return self._purges_by_id.get(purge_id) return self._purges_by_id.get(purge_id)
async def purge_room(self, room_id):
"""Purge the given room from the database"""
with (await self.pagination_lock.write(room_id)):
# check we know about the room
await self.store.get_room_version(room_id)
# first check that we have no users in this room
joined = await defer.maybeDeferred(
self.store.is_host_joined, room_id, self._server_name
)
if joined:
raise SynapseError(400, "Users are still joined to this room")
await self.store.purge_room(room_id)
@defer.inlineCallbacks @defer.inlineCallbacks
def get_messages( def get_messages(
self, self,

View File

@ -42,6 +42,7 @@ from synapse.rest.admin._base import (
historical_admin_path_patterns, historical_admin_path_patterns,
) )
from synapse.rest.admin.media import register_servlets_for_media_repo from synapse.rest.admin.media import register_servlets_for_media_repo
from synapse.rest.admin.purge_room_servlet import PurgeRoomServlet
from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet
from synapse.types import UserID, create_requester from synapse.types import UserID, create_requester
from synapse.util.versionstring import get_version_string from synapse.util.versionstring import get_version_string
@ -738,6 +739,7 @@ def register_servlets(hs, http_server):
Register all the admin servlets. Register all the admin servlets.
""" """
register_servlets_for_client_rest_resource(hs, http_server) register_servlets_for_client_rest_resource(hs, http_server)
PurgeRoomServlet(hs).register(http_server)
SendServerNoticeServlet(hs).register(http_server) SendServerNoticeServlet(hs).register(http_server)
VersionServlet(hs).register(http_server) VersionServlet(hs).register(http_server)

View File

@ -0,0 +1,57 @@
# -*- coding: utf-8 -*-
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
from synapse.http.servlet import (
RestServlet,
assert_params_in_dict,
parse_json_object_from_request,
)
from synapse.rest.admin import assert_requester_is_admin
class PurgeRoomServlet(RestServlet):
"""Servlet which will remove all trace of a room from the database
POST /_synapse/admin/v1/purge_room
{
"room_id": "!room:id"
}
returns:
{}
"""
PATTERNS = (re.compile("^/_synapse/admin/v1/purge_room$"),)
def __init__(self, hs):
"""
Args:
hs (synapse.server.HomeServer): server
"""
self.hs = hs
self.auth = hs.get_auth()
self.pagination_handler = hs.get_pagination_handler()
async def on_POST(self, request):
await assert_requester_is_admin(self.auth, request)
body = parse_json_object_from_request(request)
assert_params_in_dict(body, ("room_id",))
await self.pagination_handler.purge_room(body["room_id"])
return (200, {})

View File

@ -2181,6 +2181,143 @@ class EventsStore(
return to_delete, to_dedelta return to_delete, to_dedelta
def purge_room(self, room_id):
"""Deletes all record of a room
Args:
room_id (str):
"""
return self.runInteraction("purge_room", self._purge_room_txn, room_id)
def _purge_room_txn(self, txn, room_id):
# first we have to delete the state groups states
logger.info("[purge] removing %s from state_groups_state", room_id)
txn.execute(
"""
DELETE FROM state_groups_state WHERE state_group IN (
SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
WHERE events.room_id=?
)
""",
(room_id,),
)
# ... and the state group edges
logger.info("[purge] removing %s from state_group_edges", room_id)
txn.execute(
"""
DELETE FROM state_group_edges WHERE state_group IN (
SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
WHERE events.room_id=?
)
""",
(room_id,),
)
# ... and the state groups
logger.info("[purge] removing %s from state_groups", room_id)
txn.execute(
"""
DELETE FROM state_groups WHERE id IN (
SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
WHERE events.room_id=?
)
""",
(room_id,),
)
# and then tables which lack an index on room_id but have one on event_id
for table in (
"event_auth",
"event_edges",
"event_push_actions_staging",
"event_reference_hashes",
"event_relations",
"event_to_state_groups",
"redactions",
"rejections",
"state_events",
):
logger.info("[purge] removing %s from %s", room_id, table)
txn.execute(
"""
DELETE FROM %s WHERE event_id IN (
SELECT event_id FROM events WHERE room_id=?
)
"""
% (table,),
(room_id,),
)
# and finally, the tables with an index on room_id (or no useful index)
for table in (
"current_state_events",
"event_backward_extremities",
"event_forward_extremities",
"event_json",
"event_push_actions",
"event_search",
"events",
"group_rooms",
"public_room_list_stream",
"receipts_graph",
"receipts_linearized",
"room_aliases",
"room_depth",
"room_memberships",
"room_state",
"room_stats",
"room_stats_earliest_token",
"rooms",
"stream_ordering_to_exterm",
"topics",
"users_in_public_rooms",
"users_who_share_private_rooms",
# no useful index, but let's clear them anyway
"appservice_room_list",
"e2e_room_keys",
"event_push_summary",
"pusher_throttle",
"group_summary_rooms",
"local_invites",
"room_account_data",
"room_tags",
):
logger.info("[purge] removing %s from %s", room_id, table)
txn.execute("DELETE FROM %s WHERE room_id=?" % (table,), (room_id,))
# Other tables we do NOT need to clear out:
#
# - blocked_rooms
# This is important, to make sure that we don't accidentally rejoin a blocked
# room after it was purged
#
# - user_directory
# This has a room_id column, but it is unused
#
# Other tables that we might want to consider clearing out include:
#
# - event_reports
# Given that these are intended for abuse management my initial
# inclination is to leave them in place.
#
# - current_state_delta_stream
# - ex_outlier_stream
# - room_tags_revisions
# The problem with these is that they are largeish and there is no room_id
# index on them. In any case we should be clearing out 'stream' tables
# periodically anyway (#5888)
# TODO: we could probably usefully do a bunch of cache invalidation here
logger.info("[purge] done")
@defer.inlineCallbacks @defer.inlineCallbacks
def is_event_after(self, event_id1, event_id2): def is_event_after(self, event_id1, event_id2):
"""Returns True if event_id1 is after event_id2 in the stream """Returns True if event_id1 is after event_id2 in the stream