mirror of
https://git.anonymousland.org/anonymousland/synapse-product.git
synced 2024-12-28 23:06:10 -05:00
Attempt to delete more duplicate rows in receipts_linearized table. (#14915)
The previous assumption was that the stream_id column was unique (for a room ID, receipt type, user ID tuple), but this turned out to be incorrect. Now find the max stream ID, then map this back to a database-specific row identifier and delete other rows which match the (room ID, receipt type, user ID) tuple, but *not* the row ID.
This commit is contained in:
parent
bb675913f0
commit
230a831c73
1
changelog.d/14915.bugfix
Normal file
1
changelog.d/14915.bugfix
Normal file
@ -0,0 +1 @@
|
|||||||
|
Fix a bug introduced in Synapse 1.70.0 where the background updates to add non-thread unique indexes on receipts could fail when upgrading from 1.67.0 or earlier.
|
@ -941,10 +941,14 @@ class ReceiptsBackgroundUpdateStore(SQLBaseStore):
|
|||||||
receipts."""
|
receipts."""
|
||||||
|
|
||||||
def _remote_duplicate_receipts_txn(txn: LoggingTransaction) -> None:
|
def _remote_duplicate_receipts_txn(txn: LoggingTransaction) -> None:
|
||||||
|
if isinstance(self.database_engine, PostgresEngine):
|
||||||
|
ROW_ID_NAME = "ctid"
|
||||||
|
else:
|
||||||
|
ROW_ID_NAME = "rowid"
|
||||||
|
|
||||||
# Identify any duplicate receipts arising from
|
# Identify any duplicate receipts arising from
|
||||||
# https://github.com/matrix-org/synapse/issues/14406.
|
# https://github.com/matrix-org/synapse/issues/14406.
|
||||||
# We expect the following query to use the per-thread receipt index and take
|
# The following query takes less than a minute on matrix.org.
|
||||||
# less than a minute.
|
|
||||||
sql = """
|
sql = """
|
||||||
SELECT MAX(stream_id), room_id, receipt_type, user_id
|
SELECT MAX(stream_id), room_id, receipt_type, user_id
|
||||||
FROM receipts_linearized
|
FROM receipts_linearized
|
||||||
@ -956,19 +960,33 @@ class ReceiptsBackgroundUpdateStore(SQLBaseStore):
|
|||||||
duplicate_keys = cast(List[Tuple[int, str, str, str]], list(txn))
|
duplicate_keys = cast(List[Tuple[int, str, str, str]], list(txn))
|
||||||
|
|
||||||
# Then remove duplicate receipts, keeping the one with the highest
|
# Then remove duplicate receipts, keeping the one with the highest
|
||||||
# `stream_id`. There should only be a single receipt with any given
|
# `stream_id`. Since there might be duplicate rows with the same
|
||||||
# `stream_id`.
|
# `stream_id`, we delete by the ctid instead.
|
||||||
for max_stream_id, room_id, receipt_type, user_id in duplicate_keys:
|
for stream_id, room_id, receipt_type, user_id in duplicate_keys:
|
||||||
sql = """
|
sql = f"""
|
||||||
|
SELECT {ROW_ID_NAME}
|
||||||
|
FROM receipts_linearized
|
||||||
|
WHERE
|
||||||
|
room_id = ? AND
|
||||||
|
receipt_type = ? AND
|
||||||
|
user_id = ? AND
|
||||||
|
thread_id IS NULL AND
|
||||||
|
stream_id = ?
|
||||||
|
LIMIT 1
|
||||||
|
"""
|
||||||
|
txn.execute(sql, (room_id, receipt_type, user_id, stream_id))
|
||||||
|
row_id = cast(Tuple[str], txn.fetchone())[0]
|
||||||
|
|
||||||
|
sql = f"""
|
||||||
DELETE FROM receipts_linearized
|
DELETE FROM receipts_linearized
|
||||||
WHERE
|
WHERE
|
||||||
room_id = ? AND
|
room_id = ? AND
|
||||||
receipt_type = ? AND
|
receipt_type = ? AND
|
||||||
user_id = ? AND
|
user_id = ? AND
|
||||||
thread_id IS NULL AND
|
thread_id IS NULL AND
|
||||||
stream_id < ?
|
{ROW_ID_NAME} != ?
|
||||||
"""
|
"""
|
||||||
txn.execute(sql, (room_id, receipt_type, user_id, max_stream_id))
|
txn.execute(sql, (room_id, receipt_type, user_id, row_id))
|
||||||
|
|
||||||
await self.db_pool.runInteraction(
|
await self.db_pool.runInteraction(
|
||||||
self.RECEIPTS_LINEARIZED_UNIQUE_INDEX_UPDATE_NAME,
|
self.RECEIPTS_LINEARIZED_UNIQUE_INDEX_UPDATE_NAME,
|
||||||
|
@ -168,7 +168,9 @@ class ReceiptsBackgroundUpdateStoreTestCase(HomeserverTestCase):
|
|||||||
{"stream_id": 6, "event_id": "$some_event"},
|
{"stream_id": 6, "event_id": "$some_event"},
|
||||||
],
|
],
|
||||||
(self.other_room_id, "m.read", self.user_id): [
|
(self.other_room_id, "m.read", self.user_id): [
|
||||||
{"stream_id": 7, "event_id": "$some_event"}
|
# It is possible for stream IDs to be duplicated.
|
||||||
|
{"stream_id": 7, "event_id": "$some_event"},
|
||||||
|
{"stream_id": 7, "event_id": "$some_event"},
|
||||||
],
|
],
|
||||||
},
|
},
|
||||||
expected_unique_receipts={
|
expected_unique_receipts={
|
||||||
|
Loading…
Reference in New Issue
Block a user