Revert "Add event_stream_ordering column to membership state tables (#14979)"

This reverts commit 5fdc12f482.
This commit is contained in:
David Robertson 2023-02-07 15:26:55 +00:00
parent f630536a94
commit 9cd7610f86
No known key found for this signature in database
GPG Key ID: 903ECE108A39DEDD
4 changed files with 11 additions and 145 deletions

View File

@ -1147,15 +1147,11 @@ class PersistEventsStore:
# been inserted into room_memberships. # been inserted into room_memberships.
txn.execute_batch( txn.execute_batch(
"""INSERT INTO current_state_events """INSERT INTO current_state_events
(room_id, type, state_key, event_id, membership, event_stream_ordering) (room_id, type, state_key, event_id, membership)
VALUES ( VALUES (?, ?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
?, ?, ?, ?,
(SELECT membership FROM room_memberships WHERE event_id = ?),
(SELECT stream_ordering FROM events WHERE event_id = ?)
)
""", """,
[ [
(room_id, key[0], key[1], ev_id, ev_id, ev_id) (room_id, key[0], key[1], ev_id, ev_id)
for key, ev_id in to_insert.items() for key, ev_id in to_insert.items()
], ],
) )
@ -1182,15 +1178,11 @@ class PersistEventsStore:
if to_insert: if to_insert:
txn.execute_batch( txn.execute_batch(
"""INSERT INTO local_current_membership """INSERT INTO local_current_membership
(room_id, user_id, event_id, membership, event_stream_ordering) (room_id, user_id, event_id, membership)
VALUES ( VALUES (?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
?, ?, ?,
(SELECT membership FROM room_memberships WHERE event_id = ?),
(SELECT stream_ordering FROM events WHERE event_id = ?)
)
""", """,
[ [
(room_id, key[1], ev_id, ev_id, ev_id) (room_id, key[1], ev_id, ev_id)
for key, ev_id in to_insert.items() for key, ev_id in to_insert.items()
if key[0] == EventTypes.Member and self.is_mine_id(key[1]) if key[0] == EventTypes.Member and self.is_mine_id(key[1])
], ],
@ -1798,7 +1790,6 @@ class PersistEventsStore:
table="room_memberships", table="room_memberships",
keys=( keys=(
"event_id", "event_id",
"event_stream_ordering",
"user_id", "user_id",
"sender", "sender",
"room_id", "room_id",
@ -1809,7 +1800,6 @@ class PersistEventsStore:
values=[ values=[
( (
event.event_id, event.event_id,
event.internal_metadata.stream_ordering,
event.state_key, event.state_key,
event.user_id, event.user_id,
event.room_id, event.room_id,
@ -1842,7 +1832,6 @@ class PersistEventsStore:
keyvalues={"room_id": event.room_id, "user_id": event.state_key}, keyvalues={"room_id": event.room_id, "user_id": event.state_key},
values={ values={
"event_id": event.event_id, "event_id": event.event_id,
"event_stream_ordering": event.internal_metadata.stream_ordering,
"membership": event.membership, "membership": event.membership,
}, },
) )

View File

@ -17,7 +17,7 @@ from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, Set, Tuple, ca
import attr import attr
from synapse.api.constants import EventContentFields, EventTypes, RelationTypes from synapse.api.constants import EventContentFields, RelationTypes
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import make_event_from_dict from synapse.events import make_event_from_dict
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
@ -71,10 +71,6 @@ class _BackgroundUpdates:
EVENTS_JUMP_TO_DATE_INDEX = "events_jump_to_date_index" EVENTS_JUMP_TO_DATE_INDEX = "events_jump_to_date_index"
POPULATE_MEMBERSHIP_EVENT_STREAM_ORDERING = (
"populate_membership_event_stream_ordering"
)
@attr.s(slots=True, frozen=True, auto_attribs=True) @attr.s(slots=True, frozen=True, auto_attribs=True)
class _CalculateChainCover: class _CalculateChainCover:
@ -103,10 +99,6 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
): ):
super().__init__(database, db_conn, hs) super().__init__(database, db_conn, hs)
self.db_pool.updates.register_background_update_handler(
_BackgroundUpdates.POPULATE_MEMBERSHIP_EVENT_STREAM_ORDERING,
self._populate_membership_event_stream_ordering,
)
self.db_pool.updates.register_background_update_handler( self.db_pool.updates.register_background_update_handler(
_BackgroundUpdates.EVENT_ORIGIN_SERVER_TS_NAME, _BackgroundUpdates.EVENT_ORIGIN_SERVER_TS_NAME,
self._background_reindex_origin_server_ts, self._background_reindex_origin_server_ts,
@ -1506,97 +1498,3 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
) )
return batch_size return batch_size
async def _populate_membership_event_stream_ordering(
self, progress: JsonDict, batch_size: int
) -> int:
def _populate_membership_event_stream_ordering(
txn: LoggingTransaction,
) -> bool:
if "max_stream_ordering" in progress:
max_stream_ordering = progress["max_stream_ordering"]
else:
txn.execute("SELECT max(stream_ordering) FROM events")
res = txn.fetchone()
if res is None or res[0] is None:
return True
else:
max_stream_ordering = res[0]
start = progress.get("stream_ordering", 0)
stop = start + batch_size
sql = f"""
SELECT room_id, event_id, stream_ordering
FROM events
WHERE
type = '{EventTypes.Member}'
AND stream_ordering >= ?
AND stream_ordering < ?
"""
txn.execute(sql, (start, stop))
rows: List[Tuple[str, str, int]] = cast(
List[Tuple[str, str, int]], txn.fetchall()
)
event_ids: List[Tuple[str]] = []
event_stream_orderings: List[Tuple[int]] = []
for _, event_id, event_stream_ordering in rows:
event_ids.append((event_id,))
event_stream_orderings.append((event_stream_ordering,))
self.db_pool.simple_update_many_txn(
txn,
table="current_state_events",
key_names=("event_id",),
key_values=event_ids,
value_names=("event_stream_ordering",),
value_values=event_stream_orderings,
)
self.db_pool.simple_update_many_txn(
txn,
table="room_memberships",
key_names=("event_id",),
key_values=event_ids,
value_names=("event_stream_ordering",),
value_values=event_stream_orderings,
)
# NOTE: local_current_membership has no index on event_id, so only
# the room ID here will reduce the query rows read.
for room_id, event_id, event_stream_ordering in rows:
txn.execute(
"""
UPDATE local_current_membership
SET event_stream_ordering = ?
WHERE room_id = ? AND event_id = ?
""",
(event_stream_ordering, room_id, event_id),
)
self.db_pool.updates._background_update_progress_txn(
txn,
_BackgroundUpdates.POPULATE_MEMBERSHIP_EVENT_STREAM_ORDERING,
{
"stream_ordering": stop,
"max_stream_ordering": max_stream_ordering,
},
)
return stop > max_stream_ordering
finished = await self.db_pool.runInteraction(
"_populate_membership_event_stream_ordering",
_populate_membership_event_stream_ordering,
)
if finished:
await self.db_pool.updates._end_background_update(
_BackgroundUpdates.POPULATE_MEMBERSHIP_EVENT_STREAM_ORDERING
)
return batch_size

View File

@ -1779,7 +1779,7 @@ class EventsWorkerStore(SQLBaseStore):
txn: LoggingTransaction, txn: LoggingTransaction,
) -> List[Tuple[int, str, str, str, str, str, str, str, bool, bool]]: ) -> List[Tuple[int, str, str, str, str, str, str, str, bool, bool]]:
sql = ( sql = (
"SELECT out.event_stream_ordering, e.event_id, e.room_id, e.type," "SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
" se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL," " se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL,"
" e.outlier" " e.outlier"
" FROM events AS e" " FROM events AS e"
@ -1791,10 +1791,10 @@ class EventsWorkerStore(SQLBaseStore):
" LEFT JOIN event_relations USING (event_id)" " LEFT JOIN event_relations USING (event_id)"
" LEFT JOIN room_memberships USING (event_id)" " LEFT JOIN room_memberships USING (event_id)"
" LEFT JOIN rejections USING (event_id)" " LEFT JOIN rejections USING (event_id)"
" WHERE ? < out.event_stream_ordering" " WHERE ? < event_stream_ordering"
" AND out.event_stream_ordering <= ?" " AND event_stream_ordering <= ?"
" AND out.instance_name = ?" " AND out.instance_name = ?"
" ORDER BY out.event_stream_ordering ASC" " ORDER BY event_stream_ordering ASC"
) )
txn.execute(sql, (last_id, current_id, instance_name)) txn.execute(sql, (last_id, current_id, instance_name))

View File

@ -1,21 +0,0 @@
/* Copyright 2022 Beeper
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
ALTER TABLE current_state_events ADD COLUMN event_stream_ordering BIGINT;
ALTER TABLE local_current_membership ADD COLUMN event_stream_ordering BIGINT;
ALTER TABLE room_memberships ADD COLUMN event_stream_ordering BIGINT;
INSERT INTO background_updates (update_name, progress_json) VALUES
('populate_membership_event_stream_ordering', '{}');