Handle half-created indices in receipts index background update (#14650)

When Synapse is terminated while running the background update to create
the `receipts_graph` or `receipts_linearized` indexes, the indexes may
be successfully created (or marked as invalid on postgres) while the
background update remains unfinished. When Synapse next starts up, the
background update will fail because the index already exists, or exists
but is invalid on postgres.

Use the existing code to create indices in background updates, since it
handles these edge cases.

Signed-off-by: Sean Quah <seanq@matrix.org>
This commit is contained in:
Sean Quah 2022-12-09 23:02:11 +00:00 committed by GitHub
parent 3ac412b4e2
commit 373c485d8c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 60 additions and 48 deletions

2
changelog.d/14650.bugfix Normal file
View File

@ -0,0 +1,2 @@
Fix a bug introduced in Synapse 1.72.0 where the background updates to add non-thread unique indexes on receipts would fail if they were previously interrupted.

View File

@ -544,6 +544,48 @@ class BackgroundUpdater:
The named index will be dropped upon completion of the new index. The named index will be dropped upon completion of the new index.
""" """
async def updater(progress: JsonDict, batch_size: int) -> int:
await self.create_index_in_background(
index_name=index_name,
table=table,
columns=columns,
where_clause=where_clause,
unique=unique,
psql_only=psql_only,
replaces_index=replaces_index,
)
await self._end_background_update(update_name)
return 1
self._background_update_handlers[update_name] = _BackgroundUpdateHandler(
updater, oneshot=True
)
async def create_index_in_background(
self,
index_name: str,
table: str,
columns: Iterable[str],
where_clause: Optional[str] = None,
unique: bool = False,
psql_only: bool = False,
replaces_index: Optional[str] = None,
) -> None:
"""Add an index in the background.
Args:
update_name: update_name to register for
index_name: name of index to add
table: table to add index to
columns: columns/expressions to include in index
where_clause: A WHERE clause to specify a partial unique index.
unique: true to make a UNIQUE index
psql_only: true to only create this index on psql databases (useful
for virtual sqlite tables)
replaces_index: The name of an index that this index replaces.
The named index will be dropped upon completion of the new index.
"""
def create_index_psql(conn: Connection) -> None: def create_index_psql(conn: Connection) -> None:
conn.rollback() conn.rollback()
# postgres insists on autocommit for the index # postgres insists on autocommit for the index
@ -618,16 +660,11 @@ class BackgroundUpdater:
else: else:
runner = create_index_sqlite runner = create_index_sqlite
async def updater(progress: JsonDict, batch_size: int) -> int: if runner is None:
if runner is not None: return
logger.info("Adding index %s to %s", index_name, table) logger.info("Adding index %s to %s", index_name, table)
await self.db_pool.runWithConnection(runner) await self.db_pool.runWithConnection(runner)
await self._end_background_update(update_name)
return 1
self._background_update_handlers[update_name] = _BackgroundUpdateHandler(
updater, oneshot=True
)
async def _end_background_update(self, update_name: str) -> None: async def _end_background_update(self, update_name: str) -> None:
"""Removes a completed background update task from the queue. """Removes a completed background update task from the queue.

View File

@ -924,39 +924,6 @@ class ReceiptsBackgroundUpdateStore(SQLBaseStore):
return batch_size return batch_size
async def _create_receipts_index(self, index_name: str, table: str) -> None:
"""Adds a unique index on `(room_id, receipt_type, user_id)` to the given
receipts table, for non-thread receipts."""
def _create_index(conn: LoggingDatabaseConnection) -> None:
conn.rollback()
# we have to set autocommit, because postgres refuses to
# CREATE INDEX CONCURRENTLY without it.
if isinstance(self.database_engine, PostgresEngine):
conn.set_session(autocommit=True)
try:
c = conn.cursor()
# Now that the duplicates are gone, we can create the index.
concurrently = (
"CONCURRENTLY"
if isinstance(self.database_engine, PostgresEngine)
else ""
)
sql = f"""
CREATE UNIQUE INDEX {concurrently} {index_name}
ON {table}(room_id, receipt_type, user_id)
WHERE thread_id IS NULL
"""
c.execute(sql)
finally:
if isinstance(self.database_engine, PostgresEngine):
conn.set_session(autocommit=False)
await self.db_pool.runWithConnection(_create_index)
async def _background_receipts_linearized_unique_index( async def _background_receipts_linearized_unique_index(
self, progress: dict, batch_size: int self, progress: dict, batch_size: int
) -> int: ) -> int:
@ -999,9 +966,12 @@ class ReceiptsBackgroundUpdateStore(SQLBaseStore):
_remote_duplicate_receipts_txn, _remote_duplicate_receipts_txn,
) )
await self._create_receipts_index( await self.db_pool.updates.create_index_in_background(
"receipts_linearized_unique_index", index_name="receipts_linearized_unique_index",
"receipts_linearized", table="receipts_linearized",
columns=["room_id", "receipt_type", "user_id"],
where_clause="thread_id IS NULL",
unique=True,
) )
await self.db_pool.updates._end_background_update( await self.db_pool.updates._end_background_update(
@ -1050,9 +1020,12 @@ class ReceiptsBackgroundUpdateStore(SQLBaseStore):
_remote_duplicate_receipts_txn, _remote_duplicate_receipts_txn,
) )
await self._create_receipts_index( await self.db_pool.updates.create_index_in_background(
"receipts_graph_unique_index", index_name="receipts_graph_unique_index",
"receipts_graph", table="receipts_graph",
columns=["room_id", "receipt_type", "user_id"],
where_clause="thread_id IS NULL",
unique=True,
) )
await self.db_pool.updates._end_background_update( await self.db_pool.updates._end_background_update(