mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-01-22 00:11:01 -05:00
230a831c73
The previous assumption was that the stream_id column was unique (for a room ID, receipt type, user ID tuple), but this turned out to be incorrect. Now find the max stream ID, then map this back to a database-specific row identifier and delete other rows which match the (room ID, receipt type, user ID) tuple, but *not* the row ID.
212 lines
7.9 KiB
Python
212 lines
7.9 KiB
Python
# Copyright 2022 The Matrix.org Foundation C.I.C.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the 'License');
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an 'AS IS' BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
from typing import Any, Dict, Optional, Sequence, Tuple
|
|
|
|
from twisted.test.proto_helpers import MemoryReactor
|
|
|
|
from synapse.rest import admin
|
|
from synapse.rest.client import login, room
|
|
from synapse.server import HomeServer
|
|
from synapse.storage.database import LoggingTransaction
|
|
from synapse.util import Clock
|
|
|
|
from tests.unittest import HomeserverTestCase
|
|
|
|
|
|
class ReceiptsBackgroundUpdateStoreTestCase(HomeserverTestCase):
|
|
|
|
servlets = [
|
|
admin.register_servlets,
|
|
room.register_servlets,
|
|
login.register_servlets,
|
|
]
|
|
|
|
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
|
self.store = hs.get_datastores().main
|
|
self.user_id = self.register_user("foo", "pass")
|
|
self.token = self.login("foo", "pass")
|
|
self.room_id = self.helper.create_room_as(self.user_id, tok=self.token)
|
|
self.other_room_id = self.helper.create_room_as(self.user_id, tok=self.token)
|
|
|
|
def _test_background_receipts_unique_index(
|
|
self,
|
|
update_name: str,
|
|
index_name: str,
|
|
table: str,
|
|
receipts: Dict[Tuple[str, str, str], Sequence[Dict[str, Any]]],
|
|
expected_unique_receipts: Dict[Tuple[str, str, str], Optional[Dict[str, Any]]],
|
|
) -> None:
|
|
"""Test that the background update to uniqueify non-thread receipts in
|
|
the given receipts table works properly.
|
|
|
|
Args:
|
|
update_name: The name of the background update to test.
|
|
index_name: The name of the index that the background update creates.
|
|
table: The table of receipts that the background update fixes.
|
|
receipts: The test data containing duplicate receipts.
|
|
A list of receipt rows to insert, grouped by
|
|
`(room_id, receipt_type, user_id)`.
|
|
expected_unique_receipts: A dictionary of `(room_id, receipt_type, user_id)`
|
|
keys and expected receipt key-values after duplicate receipts have been
|
|
removed.
|
|
"""
|
|
# First, undo the background update.
|
|
def drop_receipts_unique_index(txn: LoggingTransaction) -> None:
|
|
txn.execute(f"DROP INDEX IF EXISTS {index_name}")
|
|
|
|
self.get_success(
|
|
self.store.db_pool.runInteraction(
|
|
"drop_receipts_unique_index",
|
|
drop_receipts_unique_index,
|
|
)
|
|
)
|
|
|
|
# Populate the receipts table, including duplicates.
|
|
for (room_id, receipt_type, user_id), rows in receipts.items():
|
|
for row in rows:
|
|
self.get_success(
|
|
self.store.db_pool.simple_insert(
|
|
table,
|
|
{
|
|
"room_id": room_id,
|
|
"receipt_type": receipt_type,
|
|
"user_id": user_id,
|
|
"thread_id": None,
|
|
"data": "{}",
|
|
**row,
|
|
},
|
|
)
|
|
)
|
|
|
|
# Insert and run the background update.
|
|
self.get_success(
|
|
self.store.db_pool.simple_insert(
|
|
"background_updates",
|
|
{
|
|
"update_name": update_name,
|
|
"progress_json": "{}",
|
|
},
|
|
)
|
|
)
|
|
|
|
self.store.db_pool.updates._all_done = False
|
|
|
|
self.wait_for_background_updates()
|
|
|
|
# Check that the remaining receipts match expectations.
|
|
for (
|
|
room_id,
|
|
receipt_type,
|
|
user_id,
|
|
), expected_row in expected_unique_receipts.items():
|
|
# Include the receipt key in the returned columns, for more informative
|
|
# assertion messages.
|
|
columns = ["room_id", "receipt_type", "user_id"]
|
|
if expected_row is not None:
|
|
columns += expected_row.keys()
|
|
|
|
rows = self.get_success(
|
|
self.store.db_pool.simple_select_list(
|
|
table=table,
|
|
keyvalues={
|
|
"room_id": room_id,
|
|
"receipt_type": receipt_type,
|
|
"user_id": user_id,
|
|
# `simple_select_onecol` does not support NULL filters,
|
|
# so skip the filter on `thread_id`.
|
|
},
|
|
retcols=columns,
|
|
desc="get_receipt",
|
|
)
|
|
)
|
|
|
|
if expected_row is not None:
|
|
self.assertEqual(
|
|
len(rows),
|
|
1,
|
|
f"Background update did not leave behind latest receipt in {table}",
|
|
)
|
|
self.assertEqual(
|
|
rows[0],
|
|
{
|
|
"room_id": room_id,
|
|
"receipt_type": receipt_type,
|
|
"user_id": user_id,
|
|
**expected_row,
|
|
},
|
|
)
|
|
else:
|
|
self.assertEqual(
|
|
len(rows),
|
|
0,
|
|
f"Background update did not remove all duplicate receipts from {table}",
|
|
)
|
|
|
|
def test_background_receipts_linearized_unique_index(self) -> None:
|
|
"""Test that the background update to uniqueify non-thread receipts in
|
|
`receipts_linearized` works properly.
|
|
"""
|
|
self._test_background_receipts_unique_index(
|
|
"receipts_linearized_unique_index",
|
|
"receipts_linearized_unique_index",
|
|
"receipts_linearized",
|
|
receipts={
|
|
(self.room_id, "m.read", self.user_id): [
|
|
{"stream_id": 5, "event_id": "$some_event"},
|
|
{"stream_id": 6, "event_id": "$some_event"},
|
|
],
|
|
(self.other_room_id, "m.read", self.user_id): [
|
|
# It is possible for stream IDs to be duplicated.
|
|
{"stream_id": 7, "event_id": "$some_event"},
|
|
{"stream_id": 7, "event_id": "$some_event"},
|
|
],
|
|
},
|
|
expected_unique_receipts={
|
|
(self.room_id, "m.read", self.user_id): {"stream_id": 6},
|
|
(self.other_room_id, "m.read", self.user_id): {"stream_id": 7},
|
|
},
|
|
)
|
|
|
|
def test_background_receipts_graph_unique_index(self) -> None:
|
|
"""Test that the background update to uniqueify non-thread receipts in
|
|
`receipts_graph` works properly.
|
|
"""
|
|
self._test_background_receipts_unique_index(
|
|
"receipts_graph_unique_index",
|
|
"receipts_graph_unique_index",
|
|
"receipts_graph",
|
|
receipts={
|
|
(self.room_id, "m.read", self.user_id): [
|
|
{
|
|
"event_ids": '["$some_event"]',
|
|
},
|
|
{
|
|
"event_ids": '["$some_event"]',
|
|
},
|
|
],
|
|
(self.other_room_id, "m.read", self.user_id): [
|
|
{
|
|
"event_ids": '["$some_event"]',
|
|
}
|
|
],
|
|
},
|
|
expected_unique_receipts={
|
|
(self.room_id, "m.read", self.user_id): None,
|
|
(self.other_room_id, "m.read", self.user_id): {
|
|
"event_ids": '["$some_event"]'
|
|
},
|
|
},
|
|
)
|