Use get_current_users_in_room from store and not StateHandler (#9910)

This commit is contained in:
Erik Johnston 2021-05-05 16:49:34 +01:00 committed by GitHub
parent d5305000f1
commit d0aee697ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 26 additions and 17 deletions

1
changelog.d/9910.bugfix Normal file
View File

@ -0,0 +1 @@
Fix bug where user directory could get out of sync if room visibility and membership changed in quick succession.

1
changelog.d/9910.feature Normal file
View File

@ -0,0 +1 @@
Improve performance after joining a large room when presence is enabled.

View File

@ -78,7 +78,7 @@ class DirectoryHandler(BaseHandler):
# TODO(erikj): Add transactions. # TODO(erikj): Add transactions.
# TODO(erikj): Check if there is a current association. # TODO(erikj): Check if there is a current association.
if not servers: if not servers:
users = await self.state.get_current_users_in_room(room_id) users = await self.store.get_users_in_room(room_id)
servers = {get_domain_from_id(u) for u in users} servers = {get_domain_from_id(u) for u in users}
if not servers: if not servers:
@ -270,7 +270,7 @@ class DirectoryHandler(BaseHandler):
Codes.NOT_FOUND, Codes.NOT_FOUND,
) )
users = await self.state.get_current_users_in_room(room_id) users = await self.store.get_users_in_room(room_id)
extra_servers = {get_domain_from_id(u) for u in users} extra_servers = {get_domain_from_id(u) for u in users}
servers = set(extra_servers) | set(servers) servers = set(extra_servers) | set(servers)

View File

@ -103,7 +103,7 @@ class EventStreamHandler(BaseHandler):
# Send down presence. # Send down presence.
if event.state_key == auth_user_id: if event.state_key == auth_user_id:
# Send down presence for everyone in the room. # Send down presence for everyone in the room.
users = await self.state.get_current_users_in_room( users = await self.store.get_users_in_room(
event.room_id event.room_id
) # type: Iterable[str] ) # type: Iterable[str]
else: else:

View File

@ -258,7 +258,7 @@ class MessageHandler:
"Getting joined members after leaving is not implemented" "Getting joined members after leaving is not implemented"
) )
users_with_profile = await self.state.get_current_users_in_room(room_id) users_with_profile = await self.store.get_users_in_room_with_profiles(room_id)
# If this is an AS, double check that they are allowed to see the members. # If this is an AS, double check that they are allowed to see the members.
# This can either be because the AS user is in the room or because there # This can either be because the AS user is in the room or because there

View File

@ -1293,7 +1293,7 @@ class PresenceHandler(BasePresenceHandler):
remote_host = get_domain_from_id(user_id) remote_host = get_domain_from_id(user_id)
users = await self.state.get_current_users_in_room(room_id) users = await self.store.get_users_in_room(room_id)
user_ids = list(filter(self.is_mine_id, users)) user_ids = list(filter(self.is_mine_id, users))
states_d = await self.current_state_for_users(user_ids) states_d = await self.current_state_for_users(user_ids)

View File

@ -1327,7 +1327,7 @@ class RoomShutdownHandler:
new_room_id = None new_room_id = None
logger.info("Shutting down room %r", room_id) logger.info("Shutting down room %r", room_id)
users = await self.state.get_current_users_in_room(room_id) users = await self.store.get_users_in_room(room_id)
kicked_users = [] kicked_users = []
failed_to_kick_users = [] failed_to_kick_users = []
for user_id in users: for user_id in users:

View File

@ -1190,7 +1190,7 @@ class SyncHandler:
# Step 1b, check for newly joined rooms # Step 1b, check for newly joined rooms
for room_id in newly_joined_rooms: for room_id in newly_joined_rooms:
joined_users = await self.state.get_current_users_in_room(room_id) joined_users = await self.store.get_users_in_room(room_id)
newly_joined_or_invited_users.update(joined_users) newly_joined_or_invited_users.update(joined_users)
# TODO: Check that these users are actually new, i.e. either they # TODO: Check that these users are actually new, i.e. either they
@ -1206,7 +1206,7 @@ class SyncHandler:
# Now find users that we no longer track # Now find users that we no longer track
for room_id in newly_left_rooms: for room_id in newly_left_rooms:
left_users = await self.state.get_current_users_in_room(room_id) left_users = await self.store.get_users_in_room(room_id)
newly_left_users.update(left_users) newly_left_users.update(left_users)
# Remove any users that we still share a room with. # Remove any users that we still share a room with.
@ -1361,7 +1361,7 @@ class SyncHandler:
extra_users_ids = set(newly_joined_or_invited_users) extra_users_ids = set(newly_joined_or_invited_users)
for room_id in newly_joined_rooms: for room_id in newly_joined_rooms:
users = await self.state.get_current_users_in_room(room_id) users = await self.store.get_users_in_room(room_id)
extra_users_ids.update(users) extra_users_ids.update(users)
extra_users_ids.discard(user.to_string()) extra_users_ids.discard(user.to_string())

View File

@ -213,19 +213,23 @@ class StateHandler:
return ret.state return ret.state
async def get_current_users_in_room( async def get_current_users_in_room(
self, room_id: str, latest_event_ids: Optional[List[str]] = None self, room_id: str, latest_event_ids: List[str]
) -> Dict[str, ProfileInfo]: ) -> Dict[str, ProfileInfo]:
""" """
Get the users who are currently in a room. Get the users who are currently in a room.
Note: This is much slower than using the equivalent method
`DataStore.get_users_in_room` or `DataStore.get_users_in_room_with_profiles`,
so this should only be used when wanting the users at a particular point
in the room.
Args: Args:
room_id: The ID of the room. room_id: The ID of the room.
latest_event_ids: Precomputed list of latest event IDs. Will be computed if None. latest_event_ids: Precomputed list of latest event IDs. Will be computed if None.
Returns: Returns:
Dictionary of user IDs to their profileinfo. Dictionary of user IDs to their profileinfo.
""" """
if not latest_event_ids:
latest_event_ids = await self.store.get_latest_event_ids_in_room(room_id)
assert latest_event_ids is not None assert latest_event_ids is not None
logger.debug("calling resolve_state_groups from get_current_users_in_room") logger.debug("calling resolve_state_groups from get_current_users_in_room")

View File

@ -69,6 +69,7 @@ class SQLBaseStore(metaclass=ABCMeta):
self._attempt_to_invalidate_cache("is_host_joined", (room_id, host)) self._attempt_to_invalidate_cache("is_host_joined", (room_id, host))
self._attempt_to_invalidate_cache("get_users_in_room", (room_id,)) self._attempt_to_invalidate_cache("get_users_in_room", (room_id,))
self._attempt_to_invalidate_cache("get_users_in_room_with_profiles", (room_id,))
self._attempt_to_invalidate_cache("get_room_summary", (room_id,)) self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
self._attempt_to_invalidate_cache("get_current_state_ids", (room_id,)) self._attempt_to_invalidate_cache("get_current_state_ids", (room_id,))

View File

@ -205,8 +205,12 @@ class RoomMemberWorkerStore(EventsWorkerStore):
def _get_users_in_room_with_profiles(txn) -> Dict[str, ProfileInfo]: def _get_users_in_room_with_profiles(txn) -> Dict[str, ProfileInfo]:
sql = """ sql = """
SELECT user_id, display_name, avatar_url FROM room_memberships SELECT state_key, display_name, avatar_url FROM room_memberships as m
WHERE room_id = ? AND membership = ? INNER JOIN current_state_events as c
ON m.event_id = c.event_id
AND m.room_id = c.room_id
AND m.user_id = c.state_key
WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?
""" """
txn.execute(sql, (room_id, Membership.JOIN)) txn.execute(sql, (room_id, Membership.JOIN))

View File

@ -142,8 +142,6 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
batch_size (int): Maximum number of state events to process batch_size (int): Maximum number of state events to process
per cycle. per cycle.
""" """
state = self.hs.get_state_handler()
# If we don't have progress filed, delete everything. # If we don't have progress filed, delete everything.
if not progress: if not progress:
await self.delete_all_from_user_dir() await self.delete_all_from_user_dir()
@ -197,7 +195,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
room_id room_id
) )
users_with_profile = await state.get_current_users_in_room(room_id) users_with_profile = await self.get_users_in_room_with_profiles(room_id)
user_ids = set(users_with_profile) user_ids = set(users_with_profile)
# Update each user in the user directory. # Update each user in the user directory.