From 19f9227643b5099666878de33453bbe361f216fc Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 9 Jan 2018 16:25:04 +0000 Subject: [PATCH 01/12] avoid 80s GIN inserts by tweaking work_mem see https://github.com/matrix-org/synapse/issues/2753 for details --- synapse/storage/search.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 479b04c63..7b1166f41 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -106,6 +106,7 @@ class SearchStore(BackgroundUpdateStore): event_search_rows.append((event_id, room_id, key, value)) if isinstance(self.database_engine, PostgresEngine): + txn.execute("SET work_mem='256KB'") sql = ( "INSERT INTO event_search (event_id, room_id, key, vector)" " VALUES (?,?,?,to_tsvector('english', ?))" @@ -123,6 +124,9 @@ class SearchStore(BackgroundUpdateStore): clump = event_search_rows[index:index + INSERT_CLUMP_SIZE] txn.executemany(sql, clump) + if isinstance(self.database_engine, PostgresEngine): + txn.execute("RESET work_mem") + progress = { "target_min_stream_id_inclusive": target_min_stream_id, "max_stream_id_exclusive": min_stream_id, From e365ad329f3c7e12bb2126217acbc62bdf0b9aec Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 9 Jan 2018 16:30:30 +0000 Subject: [PATCH 02/12] oops, tweak work_mem when actually storing --- synapse/storage/room.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 23688430b..9e2bf1ab4 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -310,6 +310,7 @@ class RoomStore(SQLBaseStore): def _store_event_search_txn(self, txn, event, key, value): if isinstance(self.database_engine, PostgresEngine): + txn.execute("SET work_mem='256KB'") sql = ( "INSERT INTO event_search" " (event_id, room_id, key, vector, stream_ordering, origin_server_ts)" @@ -323,6 +324,7 @@ class RoomStore(SQLBaseStore): event.origin_server_ts, ) ) + txn.execute("RESET work_mem") elif isinstance(self.database_engine, Sqlite3Engine): sql = ( "INSERT INTO event_search (event_id, room_id, key, value)" From e79db0a673ef79bfa30e435bf64b5c3b75ed98d9 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 9 Jan 2018 16:37:48 +0000 Subject: [PATCH 03/12] switch back from GIST to GIN indexes --- synapse/storage/search.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 479b04c63..ba7141563 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -31,7 +31,7 @@ class SearchStore(BackgroundUpdateStore): EVENT_SEARCH_UPDATE_NAME = "event_search" EVENT_SEARCH_ORDER_UPDATE_NAME = "event_search_order" - EVENT_SEARCH_USE_GIST_POSTGRES_NAME = "event_search_postgres_gist" + EVENT_SEARCH_USE_GIN_POSTGRES_NAME = "event_search_postgres_gin" def __init__(self, db_conn, hs): super(SearchStore, self).__init__(db_conn, hs) @@ -43,8 +43,8 @@ class SearchStore(BackgroundUpdateStore): self._background_reindex_search_order ) self.register_background_update_handler( - self.EVENT_SEARCH_USE_GIST_POSTGRES_NAME, - self._background_reindex_gist_search + self.EVENT_SEARCH_USE_GIN_POSTGRES_NAME, + self._background_reindex_gin_search ) @defer.inlineCallbacks @@ -145,25 +145,30 @@ class SearchStore(BackgroundUpdateStore): defer.returnValue(result) @defer.inlineCallbacks - def _background_reindex_gist_search(self, progress, batch_size): + def _background_reindex_gin_search(self, progress, batch_size): + '''This handles old synapses which used GIST indexes; converting them + back to be GIN as per the actual schema. Otherwise it crashes out + as a NOOP + ''' + def create_index(conn): conn.rollback() conn.set_session(autocommit=True) c = conn.cursor() c.execute( - "CREATE INDEX CONCURRENTLY event_search_fts_idx_gist" - " ON event_search USING GIST (vector)" + "CREATE INDEX CONCURRENTLY event_search_fts_idx" + " ON event_search USING GIN (vector)" ) - c.execute("DROP INDEX event_search_fts_idx") + c.execute("DROP INDEX event_search_fts_idx_gist") conn.set_session(autocommit=False) if isinstance(self.database_engine, PostgresEngine): yield self.runWithConnection(create_index) - yield self._end_background_update(self.EVENT_SEARCH_USE_GIST_POSTGRES_NAME) + yield self._end_background_update(self.EVENT_SEARCH_USE_GIN_POSTGRES_NAME) defer.returnValue(1) @defer.inlineCallbacks From a66f489678dc05fa89e6849405c37a9a390e62fc Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 9 Jan 2018 16:55:51 +0000 Subject: [PATCH 04/12] fix GIST->GIN switch --- .../schema/delta/38/postgres_fts_gist.sql | 6 ++-- .../schema/delta/46/postgres_fts_gin.sql | 17 +++++++++++ synapse/storage/search.py | 28 +++++++++++-------- 3 files changed, 37 insertions(+), 14 deletions(-) create mode 100644 synapse/storage/schema/delta/46/postgres_fts_gin.sql diff --git a/synapse/storage/schema/delta/38/postgres_fts_gist.sql b/synapse/storage/schema/delta/38/postgres_fts_gist.sql index f090a7b75..5fe27d687 100644 --- a/synapse/storage/schema/delta/38/postgres_fts_gist.sql +++ b/synapse/storage/schema/delta/38/postgres_fts_gist.sql @@ -13,5 +13,7 @@ * limitations under the License. */ - INSERT into background_updates (update_name, progress_json) - VALUES ('event_search_postgres_gist', '{}'); +-- We no longer do this given we back it out again in schema 46 + +-- INSERT into background_updates (update_name, progress_json) +-- VALUES ('event_search_postgres_gist', '{}'); diff --git a/synapse/storage/schema/delta/46/postgres_fts_gin.sql b/synapse/storage/schema/delta/46/postgres_fts_gin.sql new file mode 100644 index 000000000..31d7a817e --- /dev/null +++ b/synapse/storage/schema/delta/46/postgres_fts_gin.sql @@ -0,0 +1,17 @@ +/* Copyright 2018 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +INSERT into background_updates (update_name, progress_json) + VALUES ('event_search_postgres_gin', '{}'); diff --git a/synapse/storage/search.py b/synapse/storage/search.py index ba7141563..d3e76b58d 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -146,24 +146,28 @@ class SearchStore(BackgroundUpdateStore): @defer.inlineCallbacks def _background_reindex_gin_search(self, progress, batch_size): - '''This handles old synapses which used GIST indexes; converting them - back to be GIN as per the actual schema. Otherwise it crashes out - as a NOOP + '''This handles old synapses which used GIST indexes, if any; + converting them back to be GIN as per the actual schema. ''' def create_index(conn): - conn.rollback() - conn.set_session(autocommit=True) - c = conn.cursor() + try: + conn.rollback() + conn.set_session(autocommit=True) + c = conn.cursor() - c.execute( - "CREATE INDEX CONCURRENTLY event_search_fts_idx" - " ON event_search USING GIN (vector)" - ) + c.execute( + "CREATE INDEX CONCURRENTLY event_search_fts_idx" + " ON event_search USING GIN (vector)" + ) - c.execute("DROP INDEX event_search_fts_idx_gist") + c.execute("DROP INDEX event_search_fts_idx_gist") - conn.set_session(autocommit=False) + conn.set_session(autocommit=False) + except e: + logger.warn( + "Ignoring error %s when trying to switch from GIST to GIN" % (e,) + ) if isinstance(self.database_engine, PostgresEngine): yield self.runWithConnection(create_index) From 174eacc8ba71015003a78594ebc89cbe45d8384a Mon Sep 17 00:00:00 2001 From: hera Date: Tue, 9 Jan 2018 18:06:30 +0000 Subject: [PATCH 05/12] oops --- synapse/storage/room.py | 2 +- synapse/storage/search.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 9e2bf1ab4..0604f8f27 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -310,7 +310,7 @@ class RoomStore(SQLBaseStore): def _store_event_search_txn(self, txn, event, key, value): if isinstance(self.database_engine, PostgresEngine): - txn.execute("SET work_mem='256KB'") + txn.execute("SET work_mem='256kB'") sql = ( "INSERT INTO event_search" " (event_id, room_id, key, vector, stream_ordering, origin_server_ts)" diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 7b1166f41..f52f3c859 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -106,7 +106,7 @@ class SearchStore(BackgroundUpdateStore): event_search_rows.append((event_id, room_id, key, value)) if isinstance(self.database_engine, PostgresEngine): - txn.execute("SET work_mem='256KB'") + txn.execute("SET work_mem='256kB'") sql = ( "INSERT INTO event_search (event_id, room_id, key, vector)" " VALUES (?,?,?,to_tsvector('english', ?))" From 6b02fc80d173d3d4de81623d411a136abe1637e9 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 2 Feb 2018 14:32:51 +0000 Subject: [PATCH 06/12] Reinstate event_search_postgres_gist handler People may have queued updates for this, so we can't just delete it. --- synapse/storage/background_updates.py | 19 +++++++++++++++++++ synapse/storage/registration.py | 7 +------ synapse/storage/search.py | 11 +++++++++++ 3 files changed, 31 insertions(+), 6 deletions(-) diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 11a1b942f..c88759bf2 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -242,6 +242,25 @@ class BackgroundUpdateStore(SQLBaseStore): """ self._background_update_handlers[update_name] = update_handler + def register_noop_background_update(self, update_name): + """Register a noop handler for a background update. + + This is useful when we previously did a background update, but no + longer wish to do the update. In this case the background update should + be removed from the schema delta files, but there may still be some + users who have the background update queued, so this method should + also be called to clear the update. + + Args: + update_name (str): Name of update + """ + @defer.inlineCallbacks + def noop_update(progress, batch_size): + yield self._end_background_update(update_name) + defer.returnValue(1) + + self.register_background_update_handler(update_name, noop_update) + def register_background_index_update(self, update_name, index_name, table, columns, where_clause=None, unique=False, diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 3aa810981..95f75d6df 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -39,12 +39,7 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): # we no longer use refresh tokens, but it's possible that some people # might have a background update queued to build this index. Just # clear the background update. - @defer.inlineCallbacks - def noop_update(progress, batch_size): - yield self._end_background_update("refresh_tokens_device_index") - defer.returnValue(1) - self.register_background_update_handler( - "refresh_tokens_device_index", noop_update) + self.register_noop_background_update("refresh_tokens_device_index") @defer.inlineCallbacks def add_access_token_to_user(self, user_id, token, device_id=None): diff --git a/synapse/storage/search.py b/synapse/storage/search.py index d3e76b58d..13c827cf8 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -31,6 +31,7 @@ class SearchStore(BackgroundUpdateStore): EVENT_SEARCH_UPDATE_NAME = "event_search" EVENT_SEARCH_ORDER_UPDATE_NAME = "event_search_order" + EVENT_SEARCH_USE_GIST_POSTGRES_NAME = "event_search_postgres_gist" EVENT_SEARCH_USE_GIN_POSTGRES_NAME = "event_search_postgres_gin" def __init__(self, db_conn, hs): @@ -42,6 +43,16 @@ class SearchStore(BackgroundUpdateStore): self.EVENT_SEARCH_ORDER_UPDATE_NAME, self._background_reindex_search_order ) + + # we used to have a background update to turn the GIN index into a + # GIST one; we no longer do that (obviously) because we actually want + # a GIN index. However, it's possible that some people might still have + # the background update queued, so we register a handler to clear the + # background update. + self.register_noop_background_update( + self.EVENT_SEARCH_USE_GIST_POSTGRES_NAME, + ) + self.register_background_update_handler( self.EVENT_SEARCH_USE_GIN_POSTGRES_NAME, self._background_reindex_gin_search From 4eeae7ad657729eb8c2765da6fb40fc983c740f7 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Sat, 3 Feb 2018 22:57:33 +0000 Subject: [PATCH 07/12] Move store_event_search_txn to SearchStore ... as a precursor to making event storing and doing the bg update share some code. --- synapse/storage/room.py | 45 +++++++-------------------------------- synapse/storage/search.py | 35 ++++++++++++++++++++++++++++++ 2 files changed, 43 insertions(+), 37 deletions(-) diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 9f373b47e..0fcfb7f86 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -16,11 +16,9 @@ from twisted.internet import defer from synapse.api.errors import StoreError +from synapse.storage.search import SearchStore from synapse.util.caches.descriptors import cached, cachedInlineCallbacks -from ._base import SQLBaseStore -from .engines import PostgresEngine, Sqlite3Engine - import collections import logging import ujson as json @@ -40,7 +38,7 @@ RatelimitOverride = collections.namedtuple( ) -class RoomStore(SQLBaseStore): +class RoomStore(SearchStore): @defer.inlineCallbacks def store_room(self, room_id, room_creator_user_id, is_public): @@ -263,8 +261,8 @@ class RoomStore(SQLBaseStore): }, ) - self._store_event_search_txn( - txn, event, "content.topic", event.content["topic"] + self.store_event_search_txn( + txn, event, "content.topic", event.content["topic"], ) def _store_room_name_txn(self, txn, event): @@ -279,14 +277,14 @@ class RoomStore(SQLBaseStore): } ) - self._store_event_search_txn( - txn, event, "content.name", event.content["name"] + self.store_event_search_txn( + txn, event, "content.name", event.content["name"], ) def _store_room_message_txn(self, txn, event): if hasattr(event, "content") and "body" in event.content: - self._store_event_search_txn( - txn, event, "content.body", event.content["body"] + self.store_event_search_txn( + txn, event, "content.body", event.content["body"], ) def _store_history_visibility_txn(self, txn, event): @@ -308,33 +306,6 @@ class RoomStore(SQLBaseStore): event.content[key] )) - def _store_event_search_txn(self, txn, event, key, value): - if isinstance(self.database_engine, PostgresEngine): - txn.execute("SET work_mem='256kB'") - sql = ( - "INSERT INTO event_search" - " (event_id, room_id, key, vector, stream_ordering, origin_server_ts)" - " VALUES (?,?,?,to_tsvector('english', ?),?,?)" - ) - txn.execute( - sql, - ( - event.event_id, event.room_id, key, value, - event.internal_metadata.stream_ordering, - event.origin_server_ts, - ) - ) - txn.execute("RESET work_mem") - elif isinstance(self.database_engine, Sqlite3Engine): - sql = ( - "INSERT INTO event_search (event_id, room_id, key, value)" - " VALUES (?,?,?,?)" - ) - txn.execute(sql, (event.event_id, event.room_id, key, value,)) - else: - # This should be unreachable. - raise Exception("Unrecognized database engine") - def add_event_report(self, room_id, event_id, user_id, reason, content, received_ts): next_id = self._event_reports_id_gen.get_next() diff --git a/synapse/storage/search.py b/synapse/storage/search.py index f52f3c859..205e8d001 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -246,6 +246,41 @@ class SearchStore(BackgroundUpdateStore): defer.returnValue(num_rows) + def store_event_search_txn(self, txn, event, key, value): + """Add event to the search table + + Args: + txn (cursor): + event (EventBase): + key (str): + value (str): + """ + if isinstance(self.database_engine, PostgresEngine): + txn.execute("SET work_mem='256kB'") + sql = ( + "INSERT INTO event_search" + " (event_id, room_id, key, vector, stream_ordering, origin_server_ts)" + " VALUES (?,?,?,to_tsvector('english', ?),?,?)" + ) + txn.execute( + sql, + ( + event.event_id, event.room_id, key, value, + event.internal_metadata.stream_ordering, + event.origin_server_ts, + ) + ) + txn.execute("RESET work_mem") + elif isinstance(self.database_engine, Sqlite3Engine): + sql = ( + "INSERT INTO event_search (event_id, room_id, key, value)" + " VALUES (?,?,?,?)" + ) + txn.execute(sql, (event.event_id, event.room_id, key, value,)) + else: + # This should be unreachable. + raise Exception("Unrecognized database engine") + @defer.inlineCallbacks def search_msgs(self, room_ids, search_term, keys): """Performs a full text search over events with given keys. From bd25f9cf36ff86d1616853d88cebd2a4a83fa552 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Sat, 3 Feb 2018 23:05:41 +0000 Subject: [PATCH 08/12] Clean up work_mem handling Add some comments and improve exception handling when twiddling work_mem for the search update --- synapse/storage/search.py | 52 ++++++++++++++++++++++++++++++--------- 1 file changed, 41 insertions(+), 11 deletions(-) diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 205e8d001..190751bad 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -12,7 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import sys from twisted.internet import defer from .background_updates import BackgroundUpdateStore @@ -256,21 +256,51 @@ class SearchStore(BackgroundUpdateStore): value (str): """ if isinstance(self.database_engine, PostgresEngine): - txn.execute("SET work_mem='256kB'") sql = ( "INSERT INTO event_search" - " (event_id, room_id, key, vector, stream_ordering, origin_server_ts)" + " (event_id, room_id, key, vector, stream_ordering, " + " origin_server_ts)" " VALUES (?,?,?,to_tsvector('english', ?),?,?)" ) - txn.execute( - sql, - ( - event.event_id, event.room_id, key, value, - event.internal_metadata.stream_ordering, - event.origin_server_ts, + + # inserts to a GIN index are normally batched up into a pending + # list, and then all committed together once the list gets to a + # certain size. The trouble with that is that postgres (pre-9.5) + # uses work_mem to determine the length of the list, and work_mem + # is typically very large. + # + # We therefore reduce work_mem while we do the insert. + # + # (postgres 9.5 uses the separate gin_pending_list_limit setting, + # so doesn't suffer the same problem, but changing work_mem will + # be harmless) + + txn.execute("SET work_mem='256kB'") + try: + txn.execute( + sql, + ( + event.event_id, event.room_id, key, value, + event.internal_metadata.stream_ordering, + event.origin_server_ts, + ) ) - ) - txn.execute("RESET work_mem") + except Exception: + # we need to reset work_mem, but doing so may throw a new + # exception and we want to preserve the original + t, v, tb = sys.exc_info() + try: + txn.execute("RESET work_mem") + except Exception as e: + logger.warn( + "exception resetting work_mem during exception " + "handling: %r", + e, + ) + raise t, v, tb + else: + txn.execute("RESET work_mem") + elif isinstance(self.database_engine, Sqlite3Engine): sql = ( "INSERT INTO event_search (event_id, room_id, key, value)" From 80b8a28100e29e34bdc6226513575789310aa41f Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Sat, 3 Feb 2018 23:07:13 +0000 Subject: [PATCH 09/12] Factor out common code for search insert we can reuse the same code as is used for event insert, for doing the background index population. --- synapse/storage/search.py | 89 ++++++++++++++++++++++++--------------- 1 file changed, 56 insertions(+), 33 deletions(-) diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 190751bad..eecf77851 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from collections import namedtuple import sys from twisted.internet import defer @@ -26,6 +27,11 @@ import ujson as json logger = logging.getLogger(__name__) +SearchEntry = namedtuple('SearchEntry', [ + 'key', 'value', 'event_id', 'room_id', 'stream_ordering', + 'origin_server_ts', +]) + class SearchStore(BackgroundUpdateStore): @@ -49,16 +55,17 @@ class SearchStore(BackgroundUpdateStore): @defer.inlineCallbacks def _background_reindex_search(self, progress, batch_size): + # we work through the events table from highest stream id to lowest target_min_stream_id = progress["target_min_stream_id_inclusive"] max_stream_id = progress["max_stream_id_exclusive"] rows_inserted = progress.get("rows_inserted", 0) - INSERT_CLUMP_SIZE = 1000 TYPES = ["m.room.name", "m.room.message", "m.room.topic"] def reindex_search_txn(txn): sql = ( - "SELECT stream_ordering, event_id, room_id, type, content FROM events" + "SELECT stream_ordering, event_id, room_id, type, content, " + " origin_server_ts FROM events" " WHERE ? <= stream_ordering AND stream_ordering < ?" " AND (%s)" " ORDER BY stream_ordering DESC" @@ -67,6 +74,10 @@ class SearchStore(BackgroundUpdateStore): txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size)) + # we could stream straight from the results into + # store_search_entries_txn with a generator function, but that + # would mean having two cursors open on the database at once. + # Instead we just build a list of results. rows = self.cursor_to_dict(txn) if not rows: return 0 @@ -79,6 +90,8 @@ class SearchStore(BackgroundUpdateStore): event_id = row["event_id"] room_id = row["room_id"] etype = row["type"] + stream_ordering = row["stream_ordering"] + origin_server_ts = row["origin_server_ts"] try: content = json.loads(row["content"]) except Exception: @@ -93,6 +106,8 @@ class SearchStore(BackgroundUpdateStore): elif etype == "m.room.name": key = "content.name" value = content["name"] + else: + raise Exception("unexpected event type %s" % etype) except (KeyError, AttributeError): # If the event is missing a necessary field then # skip over it. @@ -103,29 +118,16 @@ class SearchStore(BackgroundUpdateStore): # then skip over it continue - event_search_rows.append((event_id, room_id, key, value)) + event_search_rows.append(SearchEntry( + key=key, + value=value, + event_id=event_id, + room_id=room_id, + stream_ordering=stream_ordering, + origin_server_ts=origin_server_ts, + )) - if isinstance(self.database_engine, PostgresEngine): - txn.execute("SET work_mem='256kB'") - sql = ( - "INSERT INTO event_search (event_id, room_id, key, vector)" - " VALUES (?,?,?,to_tsvector('english', ?))" - ) - elif isinstance(self.database_engine, Sqlite3Engine): - sql = ( - "INSERT INTO event_search (event_id, room_id, key, value)" - " VALUES (?,?,?,?)" - ) - else: - # This should be unreachable. - raise Exception("Unrecognized database engine") - - for index in range(0, len(event_search_rows), INSERT_CLUMP_SIZE): - clump = event_search_rows[index:index + INSERT_CLUMP_SIZE] - txn.executemany(sql, clump) - - if isinstance(self.database_engine, PostgresEngine): - txn.execute("RESET work_mem") + self.store_search_entries_txn(txn, event_search_rows) progress = { "target_min_stream_id_inclusive": target_min_stream_id, @@ -255,6 +257,26 @@ class SearchStore(BackgroundUpdateStore): key (str): value (str): """ + self.store_search_entries_txn( + txn, + (SearchEntry( + key=key, + value=value, + event_id=event.event_id, + room_id=event.room_id, + stream_ordering=event.internal_metadata.stream_ordering, + origin_server_ts=event.origin_server_ts, + ),), + ) + + def store_search_entries_txn(self, txn, entries): + """Add entries to the search table + + Args: + txn (cursor): + entries (iterable[SearchEntry]): + entries to be added to the table + """ if isinstance(self.database_engine, PostgresEngine): sql = ( "INSERT INTO event_search" @@ -262,6 +284,10 @@ class SearchStore(BackgroundUpdateStore): " origin_server_ts)" " VALUES (?,?,?,to_tsvector('english', ?),?,?)" ) + args = (( + entry.event_id, entry.room_id, entry.key, entry.value, + entry.stream_ordering, entry.origin_server_ts, + ) for entry in entries) # inserts to a GIN index are normally batched up into a pending # list, and then all committed together once the list gets to a @@ -277,14 +303,7 @@ class SearchStore(BackgroundUpdateStore): txn.execute("SET work_mem='256kB'") try: - txn.execute( - sql, - ( - event.event_id, event.room_id, key, value, - event.internal_metadata.stream_ordering, - event.origin_server_ts, - ) - ) + txn.executemany(sql, args) except Exception: # we need to reset work_mem, but doing so may throw a new # exception and we want to preserve the original @@ -306,7 +325,11 @@ class SearchStore(BackgroundUpdateStore): "INSERT INTO event_search (event_id, room_id, key, value)" " VALUES (?,?,?,?)" ) - txn.execute(sql, (event.event_id, event.room_id, key, value,)) + args = (( + entry.event_id, entry.room_id, entry.key, entry.value, + ) for entry in entries) + + txn.executemany(sql, args) else: # This should be unreachable. raise Exception("Unrecognized database engine") From 4a6d5517049c5b8b9e43df43a10a0dda5db07244 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 2 Feb 2018 15:25:27 +0000 Subject: [PATCH 10/12] GIN reindex: Fix syntax errors, improve exception handling --- synapse/storage/search.py | 40 ++++++++++++++++++++++++++------------- 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 13c827cf8..076ecff29 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -157,28 +157,42 @@ class SearchStore(BackgroundUpdateStore): @defer.inlineCallbacks def _background_reindex_gin_search(self, progress, batch_size): - '''This handles old synapses which used GIST indexes, if any; + """This handles old synapses which used GIST indexes, if any; converting them back to be GIN as per the actual schema. - ''' + """ def create_index(conn): + conn.rollback() + + # we have to set autocommit, because postgres refuses to + # CREATE INDEX CONCURRENTLY without it. + conn.set_session(autocommit=True) + try: - conn.rollback() - conn.set_session(autocommit=True) c = conn.cursor() + # if we skipped the conversion to GIST, we may already/still + # have an event_search_fts_idx; unfortunately postgres 9.4 + # doesn't support CREATE INDEX IF EXISTS so we just catch the + # exception and ignore it. + import psycopg2 + try: + c.execute( + "CREATE INDEX CONCURRENTLY event_search_fts_idx" + " ON event_search USING GIN (vector)" + ) + except psycopg2.ProgrammingError as e: + logger.warn( + "Ignoring error %r when trying to switch from GIST to GIN", + e + ) + + # we should now be able to delete the GIST index. c.execute( - "CREATE INDEX CONCURRENTLY event_search_fts_idx" - " ON event_search USING GIN (vector)" + "DROP INDEX IF EXISTS event_search_fts_idx_gist" ) - - c.execute("DROP INDEX event_search_fts_idx_gist") - + finally: conn.set_session(autocommit=False) - except e: - logger.warn( - "Ignoring error %s when trying to switch from GIST to GIN" % (e,) - ) if isinstance(self.database_engine, PostgresEngine): yield self.runWithConnection(create_index) From 0b27ae8dc3957e77561b2ff35a5a127532f6f9f1 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Sat, 3 Feb 2018 23:12:27 +0000 Subject: [PATCH 11/12] move search reindex to schema 47 We're up to schema v47 on develop now, so this will have to go in there to have an effect. This might cause an error if somebody has already run it in the v46 guise, and runs it again in the v47 guise, because it will cause a duplicate entry in the bbackground_updates table. On the other hand, the entry is removed once it is complete, and it is unlikely that anyone other than matrix.org has run it on v46. The update itself is harmless to re-run because it deliberately copes with the index already existing. --- synapse/storage/schema/delta/38/postgres_fts_gist.sql | 2 +- synapse/storage/schema/delta/{46 => 47}/postgres_fts_gin.sql | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename synapse/storage/schema/delta/{46 => 47}/postgres_fts_gin.sql (100%) diff --git a/synapse/storage/schema/delta/38/postgres_fts_gist.sql b/synapse/storage/schema/delta/38/postgres_fts_gist.sql index 5fe27d687..515e6b8e8 100644 --- a/synapse/storage/schema/delta/38/postgres_fts_gist.sql +++ b/synapse/storage/schema/delta/38/postgres_fts_gist.sql @@ -13,7 +13,7 @@ * limitations under the License. */ --- We no longer do this given we back it out again in schema 46 +-- We no longer do this given we back it out again in schema 47 -- INSERT into background_updates (update_name, progress_json) -- VALUES ('event_search_postgres_gist', '{}'); diff --git a/synapse/storage/schema/delta/46/postgres_fts_gin.sql b/synapse/storage/schema/delta/47/postgres_fts_gin.sql similarity index 100% rename from synapse/storage/schema/delta/46/postgres_fts_gin.sql rename to synapse/storage/schema/delta/47/postgres_fts_gin.sql From 5978dccff09e647509bb92e8125aa02e87f7a0a2 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 14 Feb 2018 15:54:09 +0000 Subject: [PATCH 12/12] remove overzealous exception handling --- synapse/storage/search.py | 28 ++++++++++------------------ 1 file changed, 10 insertions(+), 18 deletions(-) diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 8d294d497..2755acff4 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -16,7 +16,6 @@ from collections import namedtuple import logging import re -import sys import ujson as json from twisted.internet import defer @@ -335,25 +334,18 @@ class SearchStore(BackgroundUpdateStore): # (postgres 9.5 uses the separate gin_pending_list_limit setting, # so doesn't suffer the same problem, but changing work_mem will # be harmless) + # + # Note that we don't need to worry about restoring it on + # exception, because exceptions will cause the transaction to be + # rolled back, including the effects of the SET command. + # + # Also: we use SET rather than SET LOCAL because there's lots of + # other stuff going on in this transaction, which want to have the + # normal work_mem setting. txn.execute("SET work_mem='256kB'") - try: - txn.executemany(sql, args) - except Exception: - # we need to reset work_mem, but doing so may throw a new - # exception and we want to preserve the original - t, v, tb = sys.exc_info() - try: - txn.execute("RESET work_mem") - except Exception as e: - logger.warn( - "exception resetting work_mem during exception " - "handling: %r", - e, - ) - raise t, v, tb - else: - txn.execute("RESET work_mem") + txn.executemany(sql, args) + txn.execute("RESET work_mem") elif isinstance(self.database_engine, Sqlite3Engine): sql = (