Fix performance of device lists in /key/changes and sliding sync (#17537)

We do this by reusing the code from sync v2.

Reviewable commit-by-commit. The function `get_user_ids_changed` has
been rewritten entirely, so I would recommend not looking at the diff.
This commit is contained in:
Erik Johnston 2024-08-09 11:59:44 +01:00 committed by GitHub
parent f31360e34b
commit 70b0e38603
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 213 additions and 203 deletions

1
changelog.d/17537.misc Normal file
View File

@ -0,0 +1 @@
Fix performance of device lists in `/key/changes` and sliding sync.

View File

@ -20,10 +20,20 @@
# #
# #
import logging import logging
from typing import TYPE_CHECKING, Dict, Iterable, List, Mapping, Optional, Set, Tuple from typing import (
TYPE_CHECKING,
AbstractSet,
Dict,
Iterable,
List,
Mapping,
Optional,
Set,
Tuple,
)
from synapse.api import errors from synapse.api import errors
from synapse.api.constants import EduTypes, EventTypes from synapse.api.constants import EduTypes, EventTypes, Membership
from synapse.api.errors import ( from synapse.api.errors import (
Codes, Codes,
FederationDeniedError, FederationDeniedError,
@ -38,6 +48,7 @@ from synapse.metrics.background_process_metrics import (
wrap_as_background_process, wrap_as_background_process,
) )
from synapse.storage.databases.main.client_ips import DeviceLastConnectionInfo from synapse.storage.databases.main.client_ips import DeviceLastConnectionInfo
from synapse.storage.databases.main.state_deltas import StateDelta
from synapse.types import ( from synapse.types import (
DeviceListUpdates, DeviceListUpdates,
JsonDict, JsonDict,
@ -222,129 +233,115 @@ class DeviceWorkerHandler:
set_tag("user_id", user_id) set_tag("user_id", user_id)
set_tag("from_token", str(from_token)) set_tag("from_token", str(from_token))
now_room_key = self.store.get_room_max_token()
room_ids = await self.store.get_rooms_for_user(user_id) now_token = self._event_sources.get_current_token()
changed = await self.get_device_changes_in_shared_rooms( # We need to work out all the different membership changes for the user
user_id, room_ids, from_token # and user they share a room with, to pass to
# `generate_sync_entry_for_device_list`. See its docstring for details
# on the data required.
joined_room_ids = await self.store.get_rooms_for_user(user_id)
# Get the set of rooms that the user has joined/left
membership_changes = (
await self.store.get_current_state_delta_membership_changes_for_user(
user_id, from_key=from_token.room_key, to_key=now_token.room_key
)
) )
# Then work out if any users have since joined # Check for newly joined or left rooms. We need to make sure that we add
rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key) # to newly joined in the case membership goes from join -> leave -> join
# again.
newly_joined_rooms: Set[str] = set()
newly_left_rooms: Set[str] = set()
for change in membership_changes:
# We check for changes in "joinedness", i.e. if the membership has
# changed to or from JOIN.
if change.membership == Membership.JOIN:
if change.prev_membership != Membership.JOIN:
newly_joined_rooms.add(change.room_id)
newly_left_rooms.discard(change.room_id)
elif change.prev_membership == Membership.JOIN:
newly_joined_rooms.discard(change.room_id)
newly_left_rooms.add(change.room_id)
member_events = await self.store.get_membership_changes_for_user( # We now work out if any other users have since joined or left the rooms
user_id, from_token.room_key, now_room_key # the user is currently in. First we filter out rooms that we know
# haven't changed recently.
rooms_changed = self.store.get_rooms_that_changed(
joined_room_ids, from_token.room_key
) )
rooms_changed.update(event.room_id for event in member_events)
stream_ordering = from_token.room_key.stream # List of membership changes per room
room_to_deltas: Dict[str, List[StateDelta]] = {}
possibly_changed = set(changed) # The set of event IDs of membership events (so we can fetch their
possibly_left = set() # associated membership).
memberships_to_fetch: Set[str] = set()
for room_id in rooms_changed: for room_id in rooms_changed:
# Check if the forward extremities have changed. If not then we know # TODO: Only pull out membership events?
# the current state won't have changed, and so we can skip this room. state_changes = await self.store.get_current_state_deltas_for_room(
try: room_id, from_token=from_token.room_key, to_token=now_token.room_key
if not await self.store.have_room_forward_extremities_changed_since(
room_id, stream_ordering
):
continue
except errors.StoreError:
pass
current_state_ids = await self._state_storage.get_current_state_ids(
room_id, await_full_state=False
) )
for delta in state_changes:
# The user may have left the room if delta.event_type != EventTypes.Member:
# TODO: Check if they actually did or if we were just invited.
if room_id not in room_ids:
for etype, state_key in current_state_ids.keys():
if etype != EventTypes.Member:
continue
possibly_left.add(state_key)
continue
# Fetch the current state at the time.
try:
event_ids = await self.store.get_forward_extremities_for_room_at_stream_ordering(
room_id, stream_ordering=stream_ordering
)
except errors.StoreError:
# we have purged the stream_ordering index since the stream
# ordering: treat it the same as a new room
event_ids = []
# special-case for an empty prev state: include all members
# in the changed list
if not event_ids:
log_kv(
{"event": "encountered empty previous state", "room_id": room_id}
)
for etype, state_key in current_state_ids.keys():
if etype != EventTypes.Member:
continue
possibly_changed.add(state_key)
continue
current_member_id = current_state_ids.get((EventTypes.Member, user_id))
if not current_member_id:
continue
# mapping from event_id -> state_dict
prev_state_ids = await self._state_storage.get_state_ids_for_events(
event_ids,
await_full_state=False,
)
# Check if we've joined the room? If so we just blindly add all the users to
# the "possibly changed" users.
for state_dict in prev_state_ids.values():
member_event = state_dict.get((EventTypes.Member, user_id), None)
if not member_event or member_event != current_member_id:
for etype, state_key in current_state_ids.keys():
if etype != EventTypes.Member:
continue
possibly_changed.add(state_key)
break
# If there has been any change in membership, include them in the
# possibly changed list. We'll check if they are joined below,
# and we're not toooo worried about spuriously adding users.
for key, event_id in current_state_ids.items():
etype, state_key = key
if etype != EventTypes.Member:
continue continue
# check if this member has changed since any of the extremities room_to_deltas.setdefault(room_id, []).append(delta)
# at the stream_ordering, and add them to the list if so. if delta.event_id:
for state_dict in prev_state_ids.values(): memberships_to_fetch.add(delta.event_id)
prev_event_id = state_dict.get(key, None) if delta.prev_event_id:
if not prev_event_id or prev_event_id != event_id: memberships_to_fetch.add(delta.prev_event_id)
if state_key != user_id:
possibly_changed.add(state_key)
break
if possibly_changed or possibly_left: # Fetch all the memberships for the membership events
possibly_joined = possibly_changed event_id_to_memberships = await self.store.get_membership_from_event_ids(
possibly_left = possibly_changed | possibly_left memberships_to_fetch
)
# Double check if we still share rooms with the given user. joined_invited_knocked = (
users_rooms = await self.store.get_rooms_for_users(possibly_left) Membership.JOIN,
for changed_user_id, entries in users_rooms.items(): Membership.INVITE,
if any(rid in room_ids for rid in entries): Membership.KNOCK,
possibly_left.discard(changed_user_id) )
else:
possibly_joined.discard(changed_user_id)
else: # We now want to find any user that have newly joined/invited/knocked,
possibly_joined = set() # or newly left, similarly to above.
possibly_left = set() newly_joined_or_invited_or_knocked_users: Set[str] = set()
newly_left_users: Set[str] = set()
for _, deltas in room_to_deltas.items():
for delta in deltas:
# Get the prev/new memberships for the delta
new_membership = None
prev_membership = None
if delta.event_id:
m = event_id_to_memberships.get(delta.event_id)
if m is not None:
new_membership = m.membership
if delta.prev_event_id:
m = event_id_to_memberships.get(delta.prev_event_id)
if m is not None:
prev_membership = m.membership
device_list_updates = DeviceListUpdates( # Check if a user has newly joined/invited/knocked, or left.
changed=possibly_joined, if new_membership in joined_invited_knocked:
left=possibly_left, if prev_membership not in joined_invited_knocked:
newly_joined_or_invited_or_knocked_users.add(delta.state_key)
newly_left_users.discard(delta.state_key)
elif prev_membership in joined_invited_knocked:
newly_joined_or_invited_or_knocked_users.discard(delta.state_key)
newly_left_users.add(delta.state_key)
# Now we actually calculate the device list entry with the information
# calculated above.
device_list_updates = await self.generate_sync_entry_for_device_list(
user_id=user_id,
since_token=from_token,
now_token=now_token,
joined_room_ids=joined_room_ids,
newly_joined_rooms=newly_joined_rooms,
newly_joined_or_invited_or_knocked_users=newly_joined_or_invited_or_knocked_users,
newly_left_rooms=newly_left_rooms,
newly_left_users=newly_left_users,
) )
log_kv( log_kv(
@ -356,6 +353,88 @@ class DeviceWorkerHandler:
return device_list_updates return device_list_updates
@measure_func("_generate_sync_entry_for_device_list")
async def generate_sync_entry_for_device_list(
self,
user_id: str,
since_token: StreamToken,
now_token: StreamToken,
joined_room_ids: AbstractSet[str],
newly_joined_rooms: AbstractSet[str],
newly_joined_or_invited_or_knocked_users: AbstractSet[str],
newly_left_rooms: AbstractSet[str],
newly_left_users: AbstractSet[str],
) -> DeviceListUpdates:
"""Generate the DeviceListUpdates section of sync
Args:
sync_result_builder
newly_joined_rooms: Set of rooms user has joined since previous sync
newly_joined_or_invited_or_knocked_users: Set of users that have joined,
been invited to a room or are knocking on a room since
previous sync.
newly_left_rooms: Set of rooms user has left since previous sync
newly_left_users: Set of users that have left a room we're in since
previous sync
"""
# Take a copy since these fields will be mutated later.
newly_joined_or_invited_or_knocked_users = set(
newly_joined_or_invited_or_knocked_users
)
newly_left_users = set(newly_left_users)
# We want to figure out what user IDs the client should refetch
# device keys for, and which users we aren't going to track changes
# for anymore.
#
# For the first step we check:
# a. if any users we share a room with have updated their devices,
# and
# b. we also check if we've joined any new rooms, or if a user has
# joined a room we're in.
#
# For the second step we just find any users we no longer share a
# room with by looking at all users that have left a room plus users
# that were in a room we've left.
users_that_have_changed = set()
# Step 1a, check for changes in devices of users we share a room
# with
users_that_have_changed = await self.get_device_changes_in_shared_rooms(
user_id,
joined_room_ids,
from_token=since_token,
now_token=now_token,
)
# Step 1b, check for newly joined rooms
for room_id in newly_joined_rooms:
joined_users = await self.store.get_users_in_room(room_id)
newly_joined_or_invited_or_knocked_users.update(joined_users)
# TODO: Check that these users are actually new, i.e. either they
# weren't in the previous sync *or* they left and rejoined.
users_that_have_changed.update(newly_joined_or_invited_or_knocked_users)
user_signatures_changed = await self.store.get_users_whose_signatures_changed(
user_id, since_token.device_list_key
)
users_that_have_changed.update(user_signatures_changed)
# Now find users that we no longer track
for room_id in newly_left_rooms:
left_users = await self.store.get_users_in_room(room_id)
newly_left_users.update(left_users)
# Remove any users that we still share a room with.
left_users_rooms = await self.store.get_rooms_for_users(newly_left_users)
for user_id, entries in left_users_rooms.items():
if any(rid in joined_room_ids for rid in entries):
newly_left_users.discard(user_id)
return DeviceListUpdates(changed=users_that_have_changed, left=newly_left_users)
async def on_federation_query_user_devices(self, user_id: str) -> JsonDict: async def on_federation_query_user_devices(self, user_id: str) -> JsonDict:
if not self.hs.is_mine(UserID.from_string(user_id)): if not self.hs.is_mine(UserID.from_string(user_id)):
raise SynapseError(400, "User is not hosted on this homeserver") raise SynapseError(400, "User is not hosted on this homeserver")

View File

@ -86,7 +86,7 @@ from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.caches.lrucache import LruCache from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.response_cache import ResponseCache, ResponseCacheContext from synapse.util.caches.response_cache import ResponseCache, ResponseCacheContext
from synapse.util.metrics import Measure, measure_func from synapse.util.metrics import Measure
from synapse.visibility import filter_events_for_client from synapse.visibility import filter_events_for_client
if TYPE_CHECKING: if TYPE_CHECKING:
@ -1779,8 +1779,15 @@ class SyncHandler:
) )
if include_device_list_updates: if include_device_list_updates:
device_lists = await self._generate_sync_entry_for_device_list( # include_device_list_updates can only be True if we have a
sync_result_builder, # since token.
assert since_token is not None
device_lists = await self._device_handler.generate_sync_entry_for_device_list(
user_id=user_id,
since_token=since_token,
now_token=sync_result_builder.now_token,
joined_room_ids=sync_result_builder.joined_room_ids,
newly_joined_rooms=newly_joined_rooms, newly_joined_rooms=newly_joined_rooms,
newly_joined_or_invited_or_knocked_users=newly_joined_or_invited_or_knocked_users, newly_joined_or_invited_or_knocked_users=newly_joined_or_invited_or_knocked_users,
newly_left_rooms=newly_left_rooms, newly_left_rooms=newly_left_rooms,
@ -1892,8 +1899,14 @@ class SyncHandler:
newly_left_users, newly_left_users,
) = sync_result_builder.calculate_user_changes() ) = sync_result_builder.calculate_user_changes()
device_lists = await self._generate_sync_entry_for_device_list( # include_device_list_updates can only be True if we have a
sync_result_builder, # since token.
assert since_token is not None
device_lists = await self._device_handler.generate_sync_entry_for_device_list(
user_id=user_id,
since_token=since_token,
now_token=sync_result_builder.now_token,
joined_room_ids=sync_result_builder.joined_room_ids,
newly_joined_rooms=newly_joined_rooms, newly_joined_rooms=newly_joined_rooms,
newly_joined_or_invited_or_knocked_users=newly_joined_or_invited_or_knocked_users, newly_joined_or_invited_or_knocked_users=newly_joined_or_invited_or_knocked_users,
newly_left_rooms=newly_left_rooms, newly_left_rooms=newly_left_rooms,
@ -2070,94 +2083,6 @@ class SyncHandler:
return sync_result_builder return sync_result_builder
@measure_func("_generate_sync_entry_for_device_list")
async def _generate_sync_entry_for_device_list(
self,
sync_result_builder: "SyncResultBuilder",
newly_joined_rooms: AbstractSet[str],
newly_joined_or_invited_or_knocked_users: AbstractSet[str],
newly_left_rooms: AbstractSet[str],
newly_left_users: AbstractSet[str],
) -> DeviceListUpdates:
"""Generate the DeviceListUpdates section of sync
Args:
sync_result_builder
newly_joined_rooms: Set of rooms user has joined since previous sync
newly_joined_or_invited_or_knocked_users: Set of users that have joined,
been invited to a room or are knocking on a room since
previous sync.
newly_left_rooms: Set of rooms user has left since previous sync
newly_left_users: Set of users that have left a room we're in since
previous sync
"""
user_id = sync_result_builder.sync_config.user.to_string()
since_token = sync_result_builder.since_token
assert since_token is not None
# Take a copy since these fields will be mutated later.
newly_joined_or_invited_or_knocked_users = set(
newly_joined_or_invited_or_knocked_users
)
newly_left_users = set(newly_left_users)
# We want to figure out what user IDs the client should refetch
# device keys for, and which users we aren't going to track changes
# for anymore.
#
# For the first step we check:
# a. if any users we share a room with have updated their devices,
# and
# b. we also check if we've joined any new rooms, or if a user has
# joined a room we're in.
#
# For the second step we just find any users we no longer share a
# room with by looking at all users that have left a room plus users
# that were in a room we've left.
users_that_have_changed = set()
joined_room_ids = sync_result_builder.joined_room_ids
# Step 1a, check for changes in devices of users we share a room
# with
users_that_have_changed = (
await self._device_handler.get_device_changes_in_shared_rooms(
user_id,
joined_room_ids,
from_token=since_token,
now_token=sync_result_builder.now_token,
)
)
# Step 1b, check for newly joined rooms
for room_id in newly_joined_rooms:
joined_users = await self.store.get_users_in_room(room_id)
newly_joined_or_invited_or_knocked_users.update(joined_users)
# TODO: Check that these users are actually new, i.e. either they
# weren't in the previous sync *or* they left and rejoined.
users_that_have_changed.update(newly_joined_or_invited_or_knocked_users)
user_signatures_changed = await self.store.get_users_whose_signatures_changed(
user_id, since_token.device_list_key
)
users_that_have_changed.update(user_signatures_changed)
# Now find users that we no longer track
for room_id in newly_left_rooms:
left_users = await self.store.get_users_in_room(room_id)
newly_left_users.update(left_users)
# Remove any users that we still share a room with.
left_users_rooms = await self.store.get_rooms_for_users(newly_left_users)
for user_id, entries in left_users_rooms.items():
if any(rid in joined_room_ids for rid in entries):
newly_left_users.discard(user_id)
return DeviceListUpdates(changed=users_that_have_changed, left=newly_left_users)
@trace @trace
async def _generate_sync_entry_for_to_device( async def _generate_sync_entry_for_to_device(
self, sync_result_builder: "SyncResultBuilder" self, sync_result_builder: "SyncResultBuilder"

View File

@ -166,6 +166,11 @@ class StateDeltasStore(SQLBaseStore):
) -> List[StateDelta]: ) -> List[StateDelta]:
"""Get the state deltas between two tokens.""" """Get the state deltas between two tokens."""
if not self._curr_state_delta_stream_cache.has_entity_changed(
room_id, from_token.stream
):
return []
def get_current_state_deltas_for_room_txn( def get_current_state_deltas_for_room_txn(
txn: LoggingTransaction, txn: LoggingTransaction,
) -> List[StateDelta]: ) -> List[StateDelta]: