Merge remote-tracking branch 'upstream/release-v1.76'

This commit is contained in:
Tulir Asokan 2023-01-27 13:51:43 +02:00
commit f7181c8a2d
7 changed files with 79 additions and 19 deletions

View file

@ -1819,7 +1819,7 @@ class DatabasePool:
keyvalues: Optional[Dict[str, Any]] = None,
desc: str = "simple_select_many_batch",
batch_size: int = 100,
) -> List[Any]:
) -> List[Dict[str, Any]]:
"""Executes a SELECT query on the named table, which may return zero or
more rows, returning the result as a list of dicts.

View file

@ -99,6 +99,7 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
("user_signature_stream", "stream_id"),
("device_lists_outbound_pokes", "stream_id"),
("device_lists_changes_in_room", "stream_id"),
("device_lists_remote_pending", "stream_id"),
],
is_writer=hs.config.worker.worker_app is None,
)

View file

@ -60,9 +60,9 @@ from synapse.storage.util.id_generators import (
MultiWriterIdGenerator,
StreamIdGenerator,
)
from synapse.types import JsonDict, RetentionPolicy, ThirdPartyInstanceID
from synapse.types import JsonDict, RetentionPolicy, StrCollection, ThirdPartyInstanceID
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.stringutils import MXC_REGEX
if TYPE_CHECKING:
@ -1255,7 +1255,7 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
return room_servers
@cached()
@cached(max_entries=10000)
async def is_partial_state_room(self, room_id: str) -> bool:
"""Checks if this room has partial state.
@ -1274,6 +1274,27 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
return entry is not None
@cachedList(cached_method_name="is_partial_state_room", list_name="room_ids")
async def is_partial_state_room_batched(
self, room_ids: StrCollection
) -> Mapping[str, bool]:
"""Checks if the given rooms have partial state.
Returns true for "partial-state" rooms, which means that the state
at events in the room, and `current_state_events`, may not yet be
complete.
"""
rows: List[Dict[str, str]] = await self.db_pool.simple_select_many_batch(
table="partial_state_rooms",
column="room_id",
iterable=room_ids,
retcols=("room_id",),
desc="is_partial_state_room_batched",
)
partial_state_rooms = {row_dict["room_id"] for row_dict in rows}
return {room_id: room_id in partial_state_rooms for room_id in room_ids}
async def get_join_event_id_and_device_lists_stream_id_for_partial_state(
self, room_id: str
) -> Tuple[str, int]: