diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index d4cf0fc59..7157fb1df 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -210,7 +210,9 @@ class BackgroundUpdateStore(SQLBaseStore): self._background_update_handlers[update_name] = update_handler def register_background_index_update(self, update_name, index_name, - table, columns, where_clause=None): + table, columns, where_clause=None, + unique=False, + psql_only=False): """Helper for store classes to do a background index addition To use: @@ -226,6 +228,9 @@ class BackgroundUpdateStore(SQLBaseStore): index_name (str): name of index to add table (str): table to add index to columns (list[str]): columns/expressions to include in index + unique (bool): true to make a UNIQUE index + psql_only: true to only create this index on psql databases (useful + for virtual sqlite tables) """ def create_index_psql(conn): @@ -245,9 +250,11 @@ class BackgroundUpdateStore(SQLBaseStore): c.execute(sql) sql = ( - "CREATE INDEX CONCURRENTLY %(name)s ON %(table)s" + "CREATE %(unique)s INDEX CONCURRENTLY %(name)s" + " ON %(table)s" " (%(columns)s) %(where_clause)s" ) % { + "unique": "UNIQUE" if unique else "", "name": index_name, "table": table, "columns": ", ".join(columns), @@ -270,9 +277,10 @@ class BackgroundUpdateStore(SQLBaseStore): # down at the wrong moment - hance we use IF NOT EXISTS. (SQLite # has supported CREATE TABLE|INDEX IF NOT EXISTS since 3.3.0.) sql = ( - "CREATE INDEX IF NOT EXISTS %(name)s ON %(table)s" + "CREATE %(unique)s INDEX IF NOT EXISTS %(name)s ON %(table)s" " (%(columns)s)" ) % { + "unique": "UNIQUE" if unique else "", "name": index_name, "table": table, "columns": ", ".join(columns), @@ -284,13 +292,16 @@ class BackgroundUpdateStore(SQLBaseStore): if isinstance(self.database_engine, engines.PostgresEngine): runner = create_index_psql + elif psql_only: + runner = None else: runner = create_index_sqlite @defer.inlineCallbacks def updater(progress, batch_size): - logger.info("Adding index %s to %s", index_name, table) - yield self.runWithConnection(runner) + if runner is not None: + logger.info("Adding index %s to %s", index_name, table) + yield self.runWithConnection(runner) yield self._end_background_update(update_name) defer.returnValue(1) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index dbd63078c..ea6879c61 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -207,6 +207,18 @@ class EventsStore(SQLBaseStore): where_clause="contains_url = true AND outlier = false", ) + # an event_id index on event_search is useful for the purge_history + # api. Plus it means we get to enforce some integrity with a UNIQUE + # clause + self.register_background_index_update( + "event_search_event_id_idx", + index_name="event_search_event_id_idx", + table="event_search", + columns=["event_id"], + unique=True, + psql_only=True, + ) + self._event_persist_queue = _EventPeristenceQueue() def persist_events(self, events_and_contexts, backfilled=False): @@ -2022,6 +2034,8 @@ class EventsStore(SQLBaseStore): 400, "topological_ordering is greater than forward extremeties" ) + logger.debug("[purge] looking for events to delete") + txn.execute( "SELECT event_id, state_key FROM events" " LEFT JOIN state_events USING (room_id, event_id)" @@ -2030,6 +2044,14 @@ class EventsStore(SQLBaseStore): ) event_rows = txn.fetchall() + to_delete = [ + (event_id,) for event_id, state_key in event_rows + if state_key is None and not self.hs.is_mine_id(event_id) + ] + logger.info( + "[purge] found %i events before cutoff, of which %i are remote" + " non-state events to delete", len(event_rows), len(to_delete)) + for event_id, state_key in event_rows: txn.call_after(self._get_state_group_for_event.invalidate, (event_id,)) @@ -2080,6 +2102,7 @@ class EventsStore(SQLBaseStore): ) state_rows = txn.fetchall() + logger.debug("[purge] found %i redundant state groups", len(state_rows)) # make a set of the redundant state groups, so that we can look them up # efficiently @@ -2173,10 +2196,6 @@ class EventsStore(SQLBaseStore): ) # Delete all remote non-state events - to_delete = [ - (event_id,) for event_id, state_key in event_rows - if state_key is None and not self.hs.is_mine_id(event_id) - ] for table in ( "events", "event_json", @@ -2192,7 +2211,7 @@ class EventsStore(SQLBaseStore): "event_signatures", "rejections", ): - logger.debug("[purge] removing non-state events from %s", table) + logger.debug("[purge] removing remote non-state events from %s", table) txn.executemany( "DELETE FROM %s WHERE event_id = ?" % (table,), @@ -2200,7 +2219,7 @@ class EventsStore(SQLBaseStore): ) # Mark all state and own events as outliers - logger.debug("[purge] marking events as outliers") + logger.debug("[purge] marking remaining events as outliers") txn.executemany( "UPDATE events SET outlier = ?" " WHERE event_id = ?", @@ -2210,7 +2229,7 @@ class EventsStore(SQLBaseStore): ] ) - logger.debug("[purge] done") + logger.info("[purge] done") @defer.inlineCallbacks def is_event_after(self, event_id1, event_id2): diff --git a/synapse/storage/schema/delta/37/remove_auth_idx.py b/synapse/storage/schema/delta/37/remove_auth_idx.py index 784f3b348..20ad8bd5a 100644 --- a/synapse/storage/schema/delta/37/remove_auth_idx.py +++ b/synapse/storage/schema/delta/37/remove_auth_idx.py @@ -36,6 +36,10 @@ DROP INDEX IF EXISTS transactions_have_ref; -- and is used incredibly rarely. DROP INDEX IF EXISTS events_order_topo_stream_room; +-- an equivalent index to this actually gets re-created in delta 41, because it +-- turned out that deleting it wasn't a great plan :/. In any case, let's +-- delete it here, and delta 41 will create a new one with an added UNIQUE +-- constraint DROP INDEX IF EXISTS event_search_ev_idx; """ diff --git a/synapse/storage/schema/delta/41/event_search_event_id_idx.sql b/synapse/storage/schema/delta/41/event_search_event_id_idx.sql new file mode 100644 index 000000000..5d9cfecf3 --- /dev/null +++ b/synapse/storage/schema/delta/41/event_search_event_id_idx.sql @@ -0,0 +1,17 @@ +/* Copyright 2017 Vector Creations Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +INSERT into background_updates (update_name, progress_json) + VALUES ('event_search_event_id_idx', '{}');