From a67bf0b074acfca69647030beb9b775359fe684d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 4 Jul 2016 16:02:50 +0100 Subject: [PATCH 1/7] Add storage function to purge history for a room --- synapse/storage/events.py | 140 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 140 insertions(+) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 88a6ff731..98c917ce1 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1281,6 +1281,146 @@ class EventsStore(SQLBaseStore): ) return self.runInteraction("get_all_new_events", get_all_new_events_txn) + def _delete_old_state_txn(self, txn, room_id, topological_ordering): + """Deletes old room state + """ + + # Tables that should be pruned: + # event_auth + # event_backward_extremities + # event_content_hashes + # event_destinations + # event_edge_hashes + # event_edges + # event_forward_extremities + # event_json + # event_push_actions + # event_reference_hashes + # event_search + # event_signatures + # event_to_state_groups + # events + # rejections + # room_depth + # state_groups + # state_groups_state + + # First ensure that we're not about to delete all the forward extremeties + txn.execute( + "SELECT e.event_id, e.depth FROM events as e " + "INNER JOIN event_forward_extremities as f " + "ON e.event_id = f.event_id " + "AND e.room_id = f.room_id " + "WHERE f.room_id = ?", + (room_id,) + ) + rows = txn.fetchall() + max_depth = max(row[0] for row in rows) + + if max_depth <= topological_ordering: + raise Exception("topological_ordering is greater than forward extremeties") + + txn.execute( + "SELECT event_id, state_key FROM events" + " LEFT JOIN state_events USING (room_id, event_id)" + " WHERE room_id = ? AND topological_ordering < ?", + (room_id, topological_ordering,) + ) + event_rows = txn.fetchall() + + # We calculate the new entries for the backward extremeties by finding + # all events that point to events that are to be purged + txn.execute( + "SELECT e.event_id FROM events as e" + " INNER JOIN event_edges as ed ON e.event_id = ed.prev_event_id" + " INNER JOIN events as e2 ON e2.event_id = ed.event_id" + " WHERE e.room_id = ? AND e.topological_ordering < ?" + " AND e2.topological_ordering >= ?", + (room_id, topological_ordering, topological_ordering) + ) + new_backwards_extrems = txn.fetchall() + + # Get all state groups that are only referenced by events that are + # to be deleted. + txn.execute( + "SELECT state_group FROM event_to_state_groups" + " INNER JOIN events USING (event_id)" + " WHERE state_group IN (" + " SELECT DISTINCT state_group FROM events" + " INNER JOIN event_to_state_groups USING (event_id)" + " WHERE room_id = ? AND topological_ordering < ?" + " )" + " GROUP BY state_group HAVING MAX(topological_ordering) < ?", + (room_id, topological_ordering, topological_ordering) + ) + state_rows = txn.fetchall() + txn.executemany( + "DELETE FROM state_groups_state WHERE state_group = ?", + state_rows + ) + txn.executemany( + "DELETE FROM state_groups WHERE id = ?", + state_rows + ) + # Delete all non-state + txn.executemany( + "DELETE FROM event_to_state_groups WHERE event_id = ?", + [(event_id,) for event_id, _ in event_rows] + ) + + txn.execute( + "UPDATE room_depth SET min_depth = ? WHERE room_id = ?", + (topological_ordering, room_id,) + ) + + # Delete all remote non-state events + to_delete = [ + (event_id,) for event_id, state_key in event_rows + if state_key is None and not self.hs.is_mine_id(event_id) + ] + to_not_delete = [ + (event_id,) for event_id, state_key in event_rows + if state_key is not None or self.hs.is_mine_id(event_id) + ] + for table in ( + "events", + "event_json", + "event_auth", + "event_content_hashes", + "event_destinations", + "event_edge_hashes", + "event_edges", + "event_forward_extremities", + "event_push_actions", + "event_reference_hashes", + "event_search", + "event_signatures", + "rejections", + "event_backward_extremities", + ): + txn.executemany( + "DELETE FROM %s WHERE event_id = ?" % (table,), + to_delete + ) + + # Update backward extremeties + txn.executemany( + "INSERT INTO event_backward_extremities (room_id, event_id)" + " VALUES (?, ?)", + [(room_id, event_id) for event_id, in new_backwards_extrems] + ) + + txn.executemany( + "DELETE FROM events WHERE event_id = ?", + to_delete + ) + # Mark all state and own events as outliers + txn.executemany( + "UPDATE events SET outlier = ?" + " WHERE event_id = ?", + to_not_delete + ) + AllNewEventsResult = namedtuple("AllNewEventsResult", [ "new_forward_events", "new_backfill_events", From 2d21d43c34751cffb5f324bd58ceff060f65f679 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 5 Jul 2016 10:28:51 +0100 Subject: [PATCH 2/7] Add purge_history API --- synapse/handlers/federation.py | 2 +- synapse/handlers/message.py | 13 +++++++++++++ synapse/rest/client/v1/admin.py | 18 ++++++++++++++++++ synapse/storage/events.py | 6 ++++++ 4 files changed, 38 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 6c0bc7eaf..351b21824 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1413,7 +1413,7 @@ class FederationHandler(BaseHandler): local_view = dict(auth_events) remote_view = dict(auth_events) remote_view.update({ - (d.type, d.state_key): d for d in different_events + (d.type, d.state_key): d for d in different_events if d }) new_state, prev_state = self.state_handler.resolve_events( diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 15caf1950..878809d50 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -50,6 +50,19 @@ class MessageHandler(BaseHandler): self.validator = EventValidator() self.snapshot_cache = SnapshotCache() + @defer.inlineCallbacks + def purge_history(self, room_id, event_id): + event = yield self.store.get_event(event_id) + + if event.room_id != room_id: + raise SynapseError(400, "Event is for wrong room.") + + depth = event.depth + + # TODO: Lock. + + yield self.store.delete_old_state(room_id, depth) + @defer.inlineCallbacks def get_messages(self, requester, room_id=None, pagin_config=None, as_client_event=True): diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index e54c472e0..71537a7d0 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -77,6 +77,24 @@ class PurgeMediaCacheRestServlet(ClientV1RestServlet): defer.returnValue((200, ret)) +class PurgeHistoryRestServlet(ClientV1RestServlet): + PATTERNS = client_path_patterns( + "/admin/purge_history/(?P[^/]*)/(?P[^/]*)" + ) + + @defer.inlineCallbacks + def on_POST(self, request, room_id, event_id): + requester = yield self.auth.get_user_by_req(request) + is_admin = yield self.auth.is_server_admin(requester.user) + + if not is_admin: + raise AuthError(403, "You are not a server admin") + + yield self.handlers.message_handler.purge_history(room_id, event_id) + + defer.returnValue((200, {})) + + class DeactivateAccountRestServlet(ClientV1RestServlet): PATTERNS = client_path_patterns("/admin/deactivate/(?P[^/]*)") diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 98c917ce1..c3b498bb3 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1281,6 +1281,12 @@ class EventsStore(SQLBaseStore): ) return self.runInteraction("get_all_new_events", get_all_new_events_txn) + def delete_old_state(self, room_id, topological_ordering): + return self.runInteraction( + "delete_old_state", + self._delete_old_state_txn, room_id, topological_ordering + ) + def _delete_old_state_txn(self, txn, room_id, topological_ordering): """Deletes old room state """ From 7335f0addae9ff473403eaaffd7d2b02a9f1991f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 5 Jul 2016 14:44:25 +0100 Subject: [PATCH 3/7] Add ReadWriteLock --- synapse/util/async.py | 82 +++++++++++++++++++++++++++++++++++++ tests/util/test_rwlock.py | 85 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 167 insertions(+) create mode 100644 tests/util/test_rwlock.py diff --git a/synapse/util/async.py b/synapse/util/async.py index 40be7fe7e..c84b23ff4 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -194,3 +194,85 @@ class Linearizer(object): self.key_to_defer.pop(key, None) defer.returnValue(_ctx_manager()) + + +class ReadWriteLock(object): + """A deferred style read write lock. + + Example: + + with (yield read_write_lock.read("test_key")): + # do some work + """ + + # IMPLEMENTATION NOTES + # + # We track the most recent queued reader and writer deferreds (which get + # resolved when they release the lock). + # + # Read: We know its safe to acquire a read lock when the latest writer has + # been resolved. The new reader is appeneded to the list of latest readers. + # + # Write: We know its safe to acquire the write lock when both the latest + # writers and readers have been resolved. The new writer replaces the latest + # writer. + + def __init__(self): + # Latest readers queued + self.key_to_current_readers = {} + + # Latest writer queued + self.key_to_current_writer = {} + + @defer.inlineCallbacks + def read(self, key): + new_defer = defer.Deferred() + + curr_readers = self.key_to_current_readers.setdefault(key, set()) + curr_writer = self.key_to_current_writer.get(key, None) + + curr_readers.add(new_defer) + + # We wait for the latest writer to finish writing. We can safely ignore + # any existing readers... as they're readers. + yield curr_writer + + @contextmanager + def _ctx_manager(): + try: + yield + finally: + new_defer.callback(None) + self.key_to_current_readers.get(key, set()).discard(new_defer) + + defer.returnValue(_ctx_manager()) + + @defer.inlineCallbacks + def write(self, key): + new_defer = defer.Deferred() + + curr_readers = self.key_to_current_readers.get(key, set()) + curr_writer = self.key_to_current_writer.get(key, None) + + # We wait on all latest readers and writer. + to_wait_on = list(curr_readers) + if curr_writer: + to_wait_on.append(curr_writer) + + # We can clear the list of current readers since the new writer waits + # for them to finish. + curr_readers.clear() + self.key_to_current_writer[key] = new_defer + + yield defer.gatherResults(to_wait_on) + + @contextmanager + def _ctx_manager(): + try: + yield + finally: + new_defer.callback(None) + if self.key_to_current_writer[key] == new_defer: + self.key_to_current_writer.pop(key) + + defer.returnValue(_ctx_manager()) diff --git a/tests/util/test_rwlock.py b/tests/util/test_rwlock.py new file mode 100644 index 000000000..1d745ae1a --- /dev/null +++ b/tests/util/test_rwlock.py @@ -0,0 +1,85 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from tests import unittest + +from synapse.util.async import ReadWriteLock + + +class ReadWriteLockTestCase(unittest.TestCase): + + def _assert_called_before_not_after(self, lst, first_false): + for i, d in enumerate(lst[:first_false]): + self.assertTrue(d.called, msg="%d was unexpectedly false" % i) + + for i, d in enumerate(lst[first_false:]): + self.assertFalse( + d.called, msg="%d was unexpectedly true" % (i + first_false) + ) + + def test_rwlock(self): + rwlock = ReadWriteLock() + + key = object() + + ds = [ + rwlock.read(key), # 0 + rwlock.read(key), # 1 + rwlock.write(key), # 2 + rwlock.write(key), # 3 + rwlock.read(key), # 4 + rwlock.read(key), # 5 + rwlock.write(key), # 6 + ] + + self._assert_called_before_not_after(ds, 2) + + with ds[0].result: + self._assert_called_before_not_after(ds, 2) + self._assert_called_before_not_after(ds, 2) + + with ds[1].result: + self._assert_called_before_not_after(ds, 2) + self._assert_called_before_not_after(ds, 3) + + with ds[2].result: + self._assert_called_before_not_after(ds, 3) + self._assert_called_before_not_after(ds, 4) + + with ds[3].result: + self._assert_called_before_not_after(ds, 4) + self._assert_called_before_not_after(ds, 6) + + with ds[5].result: + self._assert_called_before_not_after(ds, 6) + self._assert_called_before_not_after(ds, 6) + + with ds[4].result: + self._assert_called_before_not_after(ds, 6) + self._assert_called_before_not_after(ds, 7) + + with ds[6].result: + pass + + d = rwlock.write(key) + self.assertTrue(d.called) + with d.result: + pass + + d = rwlock.read(key) + self.assertTrue(d.called) + with d.result: + pass From 8f8798bc0d572af103274fc07d3adac67ce7f51a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 5 Jul 2016 15:30:25 +0100 Subject: [PATCH 4/7] Add ReadWriteLock for pagination and history prune --- synapse/handlers/message.py | 76 +++++++++++++++++++------------------ synapse/storage/stream.py | 4 +- 2 files changed, 41 insertions(+), 39 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 878809d50..ad2753c1b 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -26,7 +26,7 @@ from synapse.types import ( UserID, RoomAlias, RoomStreamToken, StreamToken, get_domain_from_id ) from synapse.util import unwrapFirstError -from synapse.util.async import concurrently_execute, run_on_reactor +from synapse.util.async import concurrently_execute, run_on_reactor, ReadWriteLock from synapse.util.caches.snapshot_cache import SnapshotCache from synapse.util.logcontext import preserve_fn from synapse.visibility import filter_events_for_client @@ -50,6 +50,8 @@ class MessageHandler(BaseHandler): self.validator = EventValidator() self.snapshot_cache = SnapshotCache() + self.pagination_lock = ReadWriteLock() + @defer.inlineCallbacks def purge_history(self, room_id, event_id): event = yield self.store.get_event(event_id) @@ -59,9 +61,8 @@ class MessageHandler(BaseHandler): depth = event.depth - # TODO: Lock. - - yield self.store.delete_old_state(room_id, depth) + with (yield self.pagination_lock.write(room_id)): + yield self.store.delete_old_state(room_id, depth) @defer.inlineCallbacks def get_messages(self, requester, room_id=None, pagin_config=None, @@ -98,42 +99,43 @@ class MessageHandler(BaseHandler): source_config = pagin_config.get_source_config("room") - membership, member_event_id = yield self._check_in_room_or_world_readable( - room_id, user_id - ) - - if source_config.direction == 'b': - # if we're going backwards, we might need to backfill. This - # requires that we have a topo token. - if room_token.topological: - max_topo = room_token.topological - else: - max_topo = yield self.store.get_max_topological_token_for_stream_and_room( - room_id, room_token.stream - ) - - if membership == Membership.LEAVE: - # If they have left the room then clamp the token to be before - # they left the room, to save the effort of loading from the - # database. - leave_token = yield self.store.get_topological_token_for_event( - member_event_id - ) - leave_token = RoomStreamToken.parse(leave_token) - if leave_token.topological < max_topo: - source_config.from_key = str(leave_token) - - yield self.hs.get_handlers().federation_handler.maybe_backfill( - room_id, max_topo + with (yield self.pagination_lock.read(room_id)): + membership, member_event_id = yield self._check_in_room_or_world_readable( + room_id, user_id ) - events, next_key = yield data_source.get_pagination_rows( - requester.user, source_config, room_id - ) + if source_config.direction == 'b': + # if we're going backwards, we might need to backfill. This + # requires that we have a topo token. + if room_token.topological: + max_topo = room_token.topological + else: + max_topo = yield self.store.get_max_topological_token( + room_id, room_token.stream + ) - next_token = pagin_config.from_token.copy_and_replace( - "room_key", next_key - ) + if membership == Membership.LEAVE: + # If they have left the room then clamp the token to be before + # they left the room, to save the effort of loading from the + # database. + leave_token = yield self.store.get_topological_token_for_event( + member_event_id + ) + leave_token = RoomStreamToken.parse(leave_token) + if leave_token.topological < max_topo: + source_config.from_key = str(leave_token) + + yield self.hs.get_handlers().federation_handler.maybe_backfill( + room_id, max_topo + ) + + events, next_key = yield data_source.get_pagination_rows( + requester.user, source_config, room_id + ) + + next_token = pagin_config.from_token.copy_and_replace( + "room_key", next_key + ) if not events: defer.returnValue({ diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index b9ad965fd..3dda2dab5 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -487,13 +487,13 @@ class StreamStore(SQLBaseStore): row["topological_ordering"], row["stream_ordering"],) ) - def get_max_topological_token_for_stream_and_room(self, room_id, stream_key): + def get_max_topological_token(self, room_id, stream_key): sql = ( "SELECT max(topological_ordering) FROM events" " WHERE room_id = ? AND stream_ordering < ?" ) return self._execute( - "get_max_topological_token_for_stream_and_room", None, + "get_max_topological_token", None, sql, room_id, stream_key, ).addCallback( lambda r: r[0][0] if r else 0 From 67f2c901ea4196d869380c1c5cdd8569934857ed Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 6 Jul 2016 15:56:59 +0100 Subject: [PATCH 5/7] Add rest servlet. Fix SQL. --- synapse/rest/client/v1/admin.py | 1 + synapse/storage/events.py | 9 ++++----- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index 71537a7d0..b0cb31a44 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -124,3 +124,4 @@ def register_servlets(hs, http_server): WhoisRestServlet(hs).register(http_server) PurgeMediaCacheRestServlet(hs).register(http_server) DeactivateAccountRestServlet(hs).register(http_server) + PurgeHistoryRestServlet(hs).register(http_server) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index c3b498bb3..23ebd5d4c 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1384,10 +1384,6 @@ class EventsStore(SQLBaseStore): (event_id,) for event_id, state_key in event_rows if state_key is None and not self.hs.is_mine_id(event_id) ] - to_not_delete = [ - (event_id,) for event_id, state_key in event_rows - if state_key is not None or self.hs.is_mine_id(event_id) - ] for table in ( "events", "event_json", @@ -1424,7 +1420,10 @@ class EventsStore(SQLBaseStore): txn.executemany( "UPDATE events SET outlier = ?" " WHERE event_id = ?", - to_not_delete + [ + (True, event_id,) for event_id, state_key in event_rows + if state_key is not None or self.hs.is_mine_id(event_id) + ] ) From c98e1479bd39a64add0456299644e96480151625 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 7 Jul 2016 11:41:07 +0100 Subject: [PATCH 6/7] Return 400 rather than 500 --- synapse/storage/events.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 23ebd5d4c..c2136f3fd 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -23,6 +23,7 @@ from synapse.util.async import ObservableDeferred from synapse.util.logcontext import preserve_fn, PreserveLoggingContext from synapse.util.logutils import log_function from synapse.api.constants import EventTypes +from synapse.api.errors import SynapseError from canonicaljson import encode_canonical_json from collections import deque, namedtuple @@ -1324,7 +1325,9 @@ class EventsStore(SQLBaseStore): max_depth = max(row[0] for row in rows) if max_depth <= topological_ordering: - raise Exception("topological_ordering is greater than forward extremeties") + raise SynapseError( + 400, "topological_ordering is greater than forward extremeties" + ) txn.execute( "SELECT event_id, state_key FROM events" From b92e7955be10209fdd13cdb799b1ac55c981d086 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 7 Jul 2016 11:42:15 +0100 Subject: [PATCH 7/7] Comment --- synapse/storage/events.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index c2136f3fd..b58294216 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1325,6 +1325,9 @@ class EventsStore(SQLBaseStore): max_depth = max(row[0] for row in rows) if max_depth <= topological_ordering: + # We need to ensure we don't delete all the events from the datanase + # otherwise we wouldn't be able to send any events (due to not + # having any backwards extremeties) raise SynapseError( 400, "topological_ordering is greater than forward extremeties" )