mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-07-26 23:55:15 -04:00
Faster remote room joins: stream the un-partial-stating of rooms over replication. [rei:frrj/streams/unpsr] (#14473)
This commit is contained in:
parent
e1779bc69f
commit
501f62d1a6
8 changed files with 280 additions and 67 deletions
|
@ -1,5 +1,5 @@
|
|||
# Copyright 2014-2016 OpenMarket Ltd
|
||||
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
# Copyright 2019, 2022 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
|
@ -50,8 +50,14 @@ from synapse.storage.database import (
|
|||
LoggingTransaction,
|
||||
)
|
||||
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
from synapse.storage.types import Cursor
|
||||
from synapse.storage.util.id_generators import IdGenerator
|
||||
from synapse.storage.util.id_generators import (
|
||||
AbstractStreamIdGenerator,
|
||||
IdGenerator,
|
||||
MultiWriterIdGenerator,
|
||||
StreamIdGenerator,
|
||||
)
|
||||
from synapse.types import JsonDict, RetentionPolicy, ThirdPartyInstanceID
|
||||
from synapse.util import json_encoder
|
||||
from synapse.util.caches.descriptors import cached
|
||||
|
@ -114,6 +120,26 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
|
|||
|
||||
self.config: HomeServerConfig = hs.config
|
||||
|
||||
self._un_partial_stated_rooms_stream_id_gen: AbstractStreamIdGenerator
|
||||
|
||||
if isinstance(database.engine, PostgresEngine):
|
||||
self._un_partial_stated_rooms_stream_id_gen = MultiWriterIdGenerator(
|
||||
db_conn=db_conn,
|
||||
db=database,
|
||||
stream_name="un_partial_stated_room_stream",
|
||||
instance_name=self._instance_name,
|
||||
tables=[
|
||||
("un_partial_stated_room_stream", "instance_name", "stream_id")
|
||||
],
|
||||
sequence_name="un_partial_stated_room_stream_sequence",
|
||||
# TODO(faster_joins, multiple writers) Support multiple writers.
|
||||
writers=["master"],
|
||||
)
|
||||
else:
|
||||
self._un_partial_stated_rooms_stream_id_gen = StreamIdGenerator(
|
||||
db_conn, "un_partial_stated_room_stream", "stream_id"
|
||||
)
|
||||
|
||||
async def store_room(
|
||||
self,
|
||||
room_id: str,
|
||||
|
@ -1216,70 +1242,6 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
|
|||
|
||||
return room_servers
|
||||
|
||||
async def clear_partial_state_room(self, room_id: str) -> bool:
|
||||
"""Clears the partial state flag for a room.
|
||||
|
||||
Args:
|
||||
room_id: The room whose partial state flag is to be cleared.
|
||||
|
||||
Returns:
|
||||
`True` if the partial state flag has been cleared successfully.
|
||||
|
||||
`False` if the partial state flag could not be cleared because the room
|
||||
still contains events with partial state.
|
||||
"""
|
||||
try:
|
||||
await self.db_pool.runInteraction(
|
||||
"clear_partial_state_room", self._clear_partial_state_room_txn, room_id
|
||||
)
|
||||
return True
|
||||
except self.db_pool.engine.module.IntegrityError as e:
|
||||
# Assume that any `IntegrityError`s are due to partial state events.
|
||||
logger.info(
|
||||
"Exception while clearing lazy partial-state-room %s, retrying: %s",
|
||||
room_id,
|
||||
e,
|
||||
)
|
||||
return False
|
||||
|
||||
def _clear_partial_state_room_txn(
|
||||
self, txn: LoggingTransaction, room_id: str
|
||||
) -> None:
|
||||
DatabasePool.simple_delete_txn(
|
||||
txn,
|
||||
table="partial_state_rooms_servers",
|
||||
keyvalues={"room_id": room_id},
|
||||
)
|
||||
DatabasePool.simple_delete_one_txn(
|
||||
txn,
|
||||
table="partial_state_rooms",
|
||||
keyvalues={"room_id": room_id},
|
||||
)
|
||||
self._invalidate_cache_and_stream(txn, self.is_partial_state_room, (room_id,))
|
||||
self._invalidate_cache_and_stream(
|
||||
txn, self.get_partial_state_servers_at_join, (room_id,)
|
||||
)
|
||||
|
||||
# We now delete anything from `device_lists_remote_pending` with a
|
||||
# stream ID less than the minimum
|
||||
# `partial_state_rooms.device_lists_stream_id`, as we no longer need them.
|
||||
device_lists_stream_id = DatabasePool.simple_select_one_onecol_txn(
|
||||
txn,
|
||||
table="partial_state_rooms",
|
||||
keyvalues={},
|
||||
retcol="MIN(device_lists_stream_id)",
|
||||
allow_none=True,
|
||||
)
|
||||
if device_lists_stream_id is None:
|
||||
# There are no rooms being currently partially joined, so we delete everything.
|
||||
txn.execute("DELETE FROM device_lists_remote_pending")
|
||||
else:
|
||||
sql = """
|
||||
DELETE FROM device_lists_remote_pending
|
||||
WHERE stream_id <= ?
|
||||
"""
|
||||
txn.execute(sql, (device_lists_stream_id,))
|
||||
|
||||
@cached()
|
||||
async def is_partial_state_room(self, room_id: str) -> bool:
|
||||
"""Checks if this room has partial state.
|
||||
|
@ -1315,6 +1277,66 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
|
|||
)
|
||||
return result["join_event_id"], result["device_lists_stream_id"]
|
||||
|
||||
def get_un_partial_stated_rooms_token(self) -> int:
|
||||
# TODO(faster_joins, multiple writers): This is inappropriate if there
|
||||
# are multiple writers because workers that don't write often will
|
||||
# hold all readers up.
|
||||
# (See `MultiWriterIdGenerator.get_persisted_upto_position` for an
|
||||
# explanation.)
|
||||
return self._un_partial_stated_rooms_stream_id_gen.get_current_token()
|
||||
|
||||
async def get_un_partial_stated_rooms_from_stream(
|
||||
self, instance_name: str, last_id: int, current_id: int, limit: int
|
||||
) -> Tuple[List[Tuple[int, Tuple[str]]], int, bool]:
|
||||
"""Get updates for caches replication stream.
|
||||
|
||||
Args:
|
||||
instance_name: The writer we want to fetch updates from. Unused
|
||||
here since there is only ever one writer.
|
||||
last_id: The token to fetch updates from. Exclusive.
|
||||
current_id: The token to fetch updates up to. Inclusive.
|
||||
limit: The requested limit for the number of rows to return. The
|
||||
function may return more or fewer rows.
|
||||
|
||||
Returns:
|
||||
A tuple consisting of: the updates, a token to use to fetch
|
||||
subsequent updates, and whether we returned fewer rows than exists
|
||||
between the requested tokens due to the limit.
|
||||
|
||||
The token returned can be used in a subsequent call to this
|
||||
function to get further updatees.
|
||||
|
||||
The updates are a list of 2-tuples of stream ID and the row data
|
||||
"""
|
||||
|
||||
if last_id == current_id:
|
||||
return [], current_id, False
|
||||
|
||||
def get_un_partial_stated_rooms_from_stream_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> Tuple[List[Tuple[int, Tuple[str]]], int, bool]:
|
||||
sql = """
|
||||
SELECT stream_id, room_id
|
||||
FROM un_partial_stated_room_stream
|
||||
WHERE ? < stream_id AND stream_id <= ? AND instance_name = ?
|
||||
ORDER BY stream_id ASC
|
||||
LIMIT ?
|
||||
"""
|
||||
txn.execute(sql, (last_id, current_id, instance_name, limit))
|
||||
updates = [(row[0], (row[1],)) for row in txn]
|
||||
limited = False
|
||||
upto_token = current_id
|
||||
if len(updates) >= limit:
|
||||
upto_token = updates[-1][0]
|
||||
limited = True
|
||||
|
||||
return updates, upto_token, limited
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_un_partial_stated_rooms_from_stream",
|
||||
get_un_partial_stated_rooms_from_stream_txn,
|
||||
)
|
||||
|
||||
|
||||
class _BackgroundUpdates:
|
||||
REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory"
|
||||
|
@ -1806,6 +1828,8 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
|
|||
|
||||
self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id")
|
||||
|
||||
self._instance_name = hs.get_instance_name()
|
||||
|
||||
async def upsert_room_on_join(
|
||||
self, room_id: str, room_version: RoomVersion, state_events: List[EventBase]
|
||||
) -> None:
|
||||
|
@ -2270,3 +2294,84 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
|
|||
self.is_room_blocked,
|
||||
(room_id,),
|
||||
)
|
||||
|
||||
async def clear_partial_state_room(self, room_id: str) -> bool:
|
||||
"""Clears the partial state flag for a room.
|
||||
|
||||
Args:
|
||||
room_id: The room whose partial state flag is to be cleared.
|
||||
|
||||
Returns:
|
||||
`True` if the partial state flag has been cleared successfully.
|
||||
|
||||
`False` if the partial state flag could not be cleared because the room
|
||||
still contains events with partial state.
|
||||
"""
|
||||
try:
|
||||
async with self._un_partial_stated_rooms_stream_id_gen.get_next() as un_partial_state_room_stream_id:
|
||||
await self.db_pool.runInteraction(
|
||||
"clear_partial_state_room",
|
||||
self._clear_partial_state_room_txn,
|
||||
room_id,
|
||||
un_partial_state_room_stream_id,
|
||||
)
|
||||
return True
|
||||
except self.db_pool.engine.module.IntegrityError as e:
|
||||
# Assume that any `IntegrityError`s are due to partial state events.
|
||||
logger.info(
|
||||
"Exception while clearing lazy partial-state-room %s, retrying: %s",
|
||||
room_id,
|
||||
e,
|
||||
)
|
||||
return False
|
||||
|
||||
def _clear_partial_state_room_txn(
|
||||
self,
|
||||
txn: LoggingTransaction,
|
||||
room_id: str,
|
||||
un_partial_state_room_stream_id: int,
|
||||
) -> None:
|
||||
DatabasePool.simple_delete_txn(
|
||||
txn,
|
||||
table="partial_state_rooms_servers",
|
||||
keyvalues={"room_id": room_id},
|
||||
)
|
||||
DatabasePool.simple_delete_one_txn(
|
||||
txn,
|
||||
table="partial_state_rooms",
|
||||
keyvalues={"room_id": room_id},
|
||||
)
|
||||
self._invalidate_cache_and_stream(txn, self.is_partial_state_room, (room_id,))
|
||||
self._invalidate_cache_and_stream(
|
||||
txn, self.get_partial_state_servers_at_join, (room_id,)
|
||||
)
|
||||
|
||||
DatabasePool.simple_insert_txn(
|
||||
txn,
|
||||
"un_partial_stated_room_stream",
|
||||
{
|
||||
"stream_id": un_partial_state_room_stream_id,
|
||||
"instance_name": self._instance_name,
|
||||
"room_id": room_id,
|
||||
},
|
||||
)
|
||||
|
||||
# We now delete anything from `device_lists_remote_pending` with a
|
||||
# stream ID less than the minimum
|
||||
# `partial_state_rooms.device_lists_stream_id`, as we no longer need them.
|
||||
device_lists_stream_id = DatabasePool.simple_select_one_onecol_txn(
|
||||
txn,
|
||||
table="partial_state_rooms",
|
||||
keyvalues={},
|
||||
retcol="MIN(device_lists_stream_id)",
|
||||
allow_none=True,
|
||||
)
|
||||
if device_lists_stream_id is None:
|
||||
# There are no rooms being currently partially joined, so we delete everything.
|
||||
txn.execute("DELETE FROM device_lists_remote_pending")
|
||||
else:
|
||||
sql = """
|
||||
DELETE FROM device_lists_remote_pending
|
||||
WHERE stream_id <= ?
|
||||
"""
|
||||
txn.execute(sql, (device_lists_stream_id,))
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue