Sliding Sync: Split up get_room_membership_for_user_at_to_token (#17629)

This is to make it easier to reuse the logic when adding support for the
new tables

---------

Co-authored-by: Eric Eastwood <eric.eastwood@beta.gouv.fr>
This commit is contained in:
Erik Johnston 2024-09-01 10:52:03 +01:00 committed by GitHub
parent 8b6ff1dba5
commit 560b43ac02
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 242 additions and 172 deletions

1
changelog.d/17629.misc Normal file
View File

@ -0,0 +1 @@
Sliding Sync: Split up `get_room_membership_for_user_at_to_token`.

View File

@ -25,6 +25,7 @@ from typing import (
Mapping,
Optional,
Set,
Tuple,
Union,
)
@ -46,6 +47,7 @@ from synapse.storage.databases.main.state import (
Sentinel as StateSentinel,
)
from synapse.storage.databases.main.stream import CurrentStateDeltaMembership
from synapse.storage.roommember import RoomsForUser
from synapse.types import (
MutableStateMap,
PersistedEventPosition,
@ -424,6 +426,156 @@ class SlidingSyncRoomLists:
room_membership_for_user_map=room_membership_for_user_map,
)
@trace
async def _get_rewind_changes_to_current_membership_to_token(
self,
user: UserID,
rooms_for_user: Mapping[str, RoomsForUser],
to_token: StreamToken,
) -> Mapping[str, Optional[RoomsForUser]]:
"""
Takes the current set of rooms for a user (retrieved after the given
token), and returns the changes needed to "rewind" it to match the set of
memberships *at that token* (<= `to_token`).
Args:
user: User to fetch rooms for
rooms_for_user: The set of rooms for the user after the `to_token`.
to_token: The token to rewind to
Returns:
The changes to apply to rewind the the current memberships.
"""
# If the user has never joined any rooms before, we can just return an empty list
if not rooms_for_user:
return {}
user_id = user.to_string()
# Get the `RoomStreamToken` that represents the spot we queried up to when we got
# our membership snapshot from `get_rooms_for_local_user_where_membership_is()`.
#
# First, we need to get the max stream_ordering of each event persister instance
# that we queried events from.
instance_to_max_stream_ordering_map: Dict[str, int] = {}
for room_for_user in rooms_for_user.values():
instance_name = room_for_user.event_pos.instance_name
stream_ordering = room_for_user.event_pos.stream
current_instance_max_stream_ordering = (
instance_to_max_stream_ordering_map.get(instance_name)
)
if (
current_instance_max_stream_ordering is None
or stream_ordering > current_instance_max_stream_ordering
):
instance_to_max_stream_ordering_map[instance_name] = stream_ordering
# Then assemble the `RoomStreamToken`
min_stream_pos = min(instance_to_max_stream_ordering_map.values())
membership_snapshot_token = RoomStreamToken(
# Minimum position in the `instance_map`
stream=min_stream_pos,
instance_map=immutabledict(
{
instance_name: stream_pos
for instance_name, stream_pos in instance_to_max_stream_ordering_map.items()
if stream_pos > min_stream_pos
}
),
)
# Since we fetched the users room list at some point in time after the
# tokens, we need to revert/rewind some membership changes to match the point in
# time of the `to_token`. In particular, we need to make these fixups:
#
# - a) Remove rooms that the user joined after the `to_token`
# - b) Update room membership events to the point in time of the `to_token`
# Fetch membership changes that fall in the range from `to_token` up to
# `membership_snapshot_token`
#
# If our `to_token` is already the same or ahead of the latest room membership
# for the user, we don't need to do any "2)" fix-ups and can just straight-up
# use the room list from the snapshot as a base (nothing has changed)
current_state_delta_membership_changes_after_to_token = []
if not membership_snapshot_token.is_before_or_eq(to_token.room_key):
current_state_delta_membership_changes_after_to_token = (
await self.store.get_current_state_delta_membership_changes_for_user(
user_id,
from_key=to_token.room_key,
to_key=membership_snapshot_token,
excluded_room_ids=self.rooms_to_exclude_globally,
)
)
if not current_state_delta_membership_changes_after_to_token:
# There have been no membership changes, so we can early return.
return {}
# Otherwise we're about to make changes to `rooms_for_user`, so we turn
# it into a mutable dict.
changes: Dict[str, Optional[RoomsForUser]] = {}
# Assemble a list of the first membership event after the `to_token` so we can
# step backward to the previous membership that would apply to the from/to
# range.
first_membership_change_by_room_id_after_to_token: Dict[
str, CurrentStateDeltaMembership
] = {}
for membership_change in current_state_delta_membership_changes_after_to_token:
# Only set if we haven't already set it
first_membership_change_by_room_id_after_to_token.setdefault(
membership_change.room_id, membership_change
)
# Since we fetched a snapshot of the users room list at some point in time after
# the from/to tokens, we need to revert/rewind some membership changes to match
# the point in time of the `to_token`.
for (
room_id,
first_membership_change_after_to_token,
) in first_membership_change_by_room_id_after_to_token.items():
# 1a) Remove rooms that the user joined after the `to_token`
if first_membership_change_after_to_token.prev_event_id is None:
changes[room_id] = None
# 1b) 1c) From the first membership event after the `to_token`, step backward to the
# previous membership that would apply to the from/to range.
else:
# We don't expect these fields to be `None` if we have a `prev_event_id`
# but we're being defensive since it's possible that the prev event was
# culled from the database.
if (
first_membership_change_after_to_token.prev_event_pos is not None
and first_membership_change_after_to_token.prev_membership
is not None
and first_membership_change_after_to_token.prev_sender is not None
):
# We need to know the room version ID, which we normally we
# can get from the current membership, but if we don't have
# that then we need to query the DB.
current_membership = rooms_for_user.get(room_id)
if current_membership is not None:
room_version_id = current_membership.room_version_id
else:
room_version_id = await self.store.get_room_version_id(room_id)
changes[room_id] = RoomsForUser(
room_id=room_id,
event_id=first_membership_change_after_to_token.prev_event_id,
event_pos=first_membership_change_after_to_token.prev_event_pos,
membership=first_membership_change_after_to_token.prev_membership,
sender=first_membership_change_after_to_token.prev_sender,
room_version_id=room_version_id,
)
else:
# If we can't find the previous membership event, we shouldn't
# include the room in the sync response since we can't determine the
# exact membership state and shouldn't rely on the current snapshot.
changes[room_id] = None
return changes
@trace
async def get_room_membership_for_user_at_to_token(
self,
@ -469,139 +621,89 @@ class SlidingSyncRoomLists:
if not room_for_user_list:
return {}
# Since we fetched the users room list at some point in time after the
# tokens, we need to revert/rewind some membership changes to match the point in
# time of the `to_token`.
rooms_for_user = {room.room_id: room for room in room_for_user_list}
changes = await self._get_rewind_changes_to_current_membership_to_token(
user, rooms_for_user, to_token
)
for room_id, change_room_for_user in changes.items():
if change_room_for_user is None:
rooms_for_user.pop(room_id, None)
else:
rooms_for_user[room_id] = change_room_for_user
newly_joined_room_ids, newly_left_room_ids = (
await self._get_newly_joined_and_left_rooms(
user_id, to_token=to_token, from_token=from_token
)
)
dm_room_ids = await self._get_dm_rooms_for_user(user_id)
# Our working list of rooms that can show up in the sync response
sync_room_id_set = {
# Note: The `room_for_user` we're assigning here will need to be fixed up
# (below) because they are potentially from the current snapshot time
# instead from the time of the `to_token`.
room_for_user.room_id: _RoomMembershipForUser(
room_id=room_for_user.room_id,
event_id=room_for_user.event_id,
event_pos=room_for_user.event_pos,
membership=room_for_user.membership,
sender=room_for_user.sender,
# We will update these fields below to be accurate
newly_joined=False,
newly_left=False,
is_dm=False,
newly_joined=room_id in newly_joined_room_ids,
newly_left=room_id in newly_left_room_ids,
is_dm=room_id in dm_room_ids,
)
for room_for_user in room_for_user_list
for room_id, room_for_user in rooms_for_user.items()
}
# Get the `RoomStreamToken` that represents the spot we queried up to when we got
# our membership snapshot from `get_rooms_for_local_user_where_membership_is()`.
#
# First, we need to get the max stream_ordering of each event persister instance
# that we queried events from.
instance_to_max_stream_ordering_map: Dict[str, int] = {}
for room_for_user in room_for_user_list:
instance_name = room_for_user.event_pos.instance_name
stream_ordering = room_for_user.event_pos.stream
# Ensure we have entries for rooms that the user has been "state reset"
# out of. These are rooms appear in the `newly_left_rooms` map but
# aren't in the `rooms_for_user` map.
for room_id, left_event_pos in newly_left_room_ids.items():
if room_id in sync_room_id_set:
continue
current_instance_max_stream_ordering = (
instance_to_max_stream_ordering_map.get(instance_name)
)
if (
current_instance_max_stream_ordering is None
or stream_ordering > current_instance_max_stream_ordering
):
instance_to_max_stream_ordering_map[instance_name] = stream_ordering
# Then assemble the `RoomStreamToken`
min_stream_pos = min(instance_to_max_stream_ordering_map.values())
membership_snapshot_token = RoomStreamToken(
# Minimum position in the `instance_map`
stream=min_stream_pos,
instance_map=immutabledict(
{
instance_name: stream_pos
for instance_name, stream_pos in instance_to_max_stream_ordering_map.items()
if stream_pos > min_stream_pos
}
),
)
# Since we fetched the users room list at some point in time after the from/to
# tokens, we need to revert/rewind some membership changes to match the point in
# time of the `to_token`. In particular, we need to make these fixups:
#
# - 1a) Remove rooms that the user joined after the `to_token`
# - 1b) Add back rooms that the user left after the `to_token`
# - 1c) Update room membership events to the point in time of the `to_token`
# - 2) Figure out which rooms are `newly_left` rooms (> `from_token` and <= `to_token`)
# - 3) Figure out which rooms are `newly_joined` (> `from_token` and <= `to_token`)
# - 4) Figure out which rooms are DM's
# 1) Fetch membership changes that fall in the range from `to_token` up to
# `membership_snapshot_token`
#
# If our `to_token` is already the same or ahead of the latest room membership
# for the user, we don't need to do any "2)" fix-ups and can just straight-up
# use the room list from the snapshot as a base (nothing has changed)
current_state_delta_membership_changes_after_to_token = []
if not membership_snapshot_token.is_before_or_eq(to_token.room_key):
current_state_delta_membership_changes_after_to_token = (
await self.store.get_current_state_delta_membership_changes_for_user(
user_id,
from_key=to_token.room_key,
to_key=membership_snapshot_token,
excluded_room_ids=self.rooms_to_exclude_globally,
)
sync_room_id_set[room_id] = _RoomMembershipForUser(
room_id=room_id,
event_id=None,
event_pos=left_event_pos,
membership=Membership.LEAVE,
sender=None,
newly_joined=False,
newly_left=True,
is_dm=room_id in dm_room_ids,
)
# 1) Assemble a list of the first membership event after the `to_token` so we can
# step backward to the previous membership that would apply to the from/to
# range.
first_membership_change_by_room_id_after_to_token: Dict[
str, CurrentStateDeltaMembership
] = {}
for membership_change in current_state_delta_membership_changes_after_to_token:
# Only set if we haven't already set it
first_membership_change_by_room_id_after_to_token.setdefault(
membership_change.room_id, membership_change
)
return sync_room_id_set
# 1) Fixup
@trace
async def _get_newly_joined_and_left_rooms(
self,
user_id: str,
to_token: StreamToken,
from_token: Optional[StreamToken],
) -> Tuple[StrCollection, Mapping[str, PersistedEventPosition]]:
"""Fetch the sets of rooms that the user newly joined or left in the
given token range.
Note: there may be rooms in the newly left rooms where the user was
"state reset" out of the room, and so that room would not be part of the
"current memberships" of the user.
Returns:
A 2-tuple of newly joined room IDs and a map of newly left room
IDs to the event position the leave happened at.
"""
newly_joined_room_ids: Set[str] = set()
newly_left_room_map: Dict[str, PersistedEventPosition] = {}
# We need to figure out the
#
# Since we fetched a snapshot of the users room list at some point in time after
# the from/to tokens, we need to revert/rewind some membership changes to match
# the point in time of the `to_token`.
for (
room_id,
first_membership_change_after_to_token,
) in first_membership_change_by_room_id_after_to_token.items():
# 1a) Remove rooms that the user joined after the `to_token`
if first_membership_change_after_to_token.prev_event_id is None:
sync_room_id_set.pop(room_id, None)
# 1b) 1c) From the first membership event after the `to_token`, step backward to the
# previous membership that would apply to the from/to range.
else:
# We don't expect these fields to be `None` if we have a `prev_event_id`
# but we're being defensive since it's possible that the prev event was
# culled from the database.
if (
first_membership_change_after_to_token.prev_event_pos is not None
and first_membership_change_after_to_token.prev_membership
is not None
):
sync_room_id_set[room_id] = _RoomMembershipForUser(
room_id=room_id,
event_id=first_membership_change_after_to_token.prev_event_id,
event_pos=first_membership_change_after_to_token.prev_event_pos,
membership=first_membership_change_after_to_token.prev_membership,
sender=first_membership_change_after_to_token.prev_sender,
# We will update these fields below to be accurate
newly_joined=False,
newly_left=False,
is_dm=False,
)
else:
# If we can't find the previous membership event, we shouldn't
# include the room in the sync response since we can't determine the
# exact membership state and shouldn't rely on the current snapshot.
sync_room_id_set.pop(room_id, None)
# - 1) Figure out which rooms are `newly_left` rooms (> `from_token` and <= `to_token`)
# - 2) Figure out which rooms are `newly_joined` (> `from_token` and <= `to_token`)
# 2) Fetch membership changes that fall in the range from `from_token` up to `to_token`
# 1) Fetch membership changes that fall in the range from `from_token` up to `to_token`
current_state_delta_membership_changes_in_from_to_range = []
if from_token:
current_state_delta_membership_changes_in_from_to_range = (
@ -613,7 +715,7 @@ class SlidingSyncRoomLists:
)
)
# 2) Assemble a list of the last membership events in some given ranges. Someone
# 1) Assemble a list of the last membership events in some given ranges. Someone
# could have left and joined multiple times during the given range but we only
# care about end-result so we grab the last one.
last_membership_change_by_room_id_in_from_to_range: Dict[
@ -646,9 +748,9 @@ class SlidingSyncRoomLists:
if membership_change.membership != Membership.JOIN:
has_non_join_event_by_room_id_in_from_to_range[room_id] = True
# 2) Fixup
# 1) Fixup
#
# 3) We also want to assemble a list of possibly newly joined rooms. Someone
# 2) We also want to assemble a list of possibly newly joined rooms. Someone
# could have left and joined multiple times during the given range but we only
# care about whether they are joined at the end of the token range so we are
# working with the last membership even in the token range.
@ -658,46 +760,18 @@ class SlidingSyncRoomLists:
) in last_membership_change_by_room_id_in_from_to_range.values():
room_id = last_membership_change_in_from_to_range.room_id
# 3)
# 2)
if last_membership_change_in_from_to_range.membership == Membership.JOIN:
possibly_newly_joined_room_ids.add(room_id)
# 2) Figure out newly_left rooms (> `from_token` and <= `to_token`).
# 1) Figure out newly_left rooms (> `from_token` and <= `to_token`).
if last_membership_change_in_from_to_range.membership == Membership.LEAVE:
# 2) Mark this room as `newly_left`
# 1) Mark this room as `newly_left`
newly_left_room_map[room_id] = (
last_membership_change_in_from_to_range.event_pos
)
# If we're seeing a membership change here, we should expect to already
# have it in our snapshot but if a state reset happens, it wouldn't have
# shown up in our snapshot but appear as a change here.
existing_sync_entry = sync_room_id_set.get(room_id)
if existing_sync_entry is not None:
# Normal expected case
sync_room_id_set[room_id] = existing_sync_entry.copy_and_replace(
newly_left=True
)
else:
# State reset!
logger.warn(
"State reset detected for room_id %s with %s who is no longer in the room",
room_id,
user_id,
)
# Even though a state reset happened which removed the person from
# the room, we still add it the list so the user knows they left the
# room. Downstream code can check for a state reset by looking for
# `event_id=None and membership is not None`.
sync_room_id_set[room_id] = _RoomMembershipForUser(
room_id=room_id,
event_id=last_membership_change_in_from_to_range.event_id,
event_pos=last_membership_change_in_from_to_range.event_pos,
membership=last_membership_change_in_from_to_range.membership,
sender=last_membership_change_in_from_to_range.sender,
newly_joined=False,
newly_left=True,
is_dm=False,
)
# 3) Figure out `newly_joined`
# 2) Figure out `newly_joined`
for room_id in possibly_newly_joined_room_ids:
has_non_join_in_from_to_range = (
has_non_join_event_by_room_id_in_from_to_range.get(room_id, False)
@ -706,9 +780,7 @@ class SlidingSyncRoomLists:
# also some non-join in the range, we know they `newly_joined`.
if has_non_join_in_from_to_range:
# We found a `newly_joined` room (we left and joined within the token range)
sync_room_id_set[room_id] = sync_room_id_set[room_id].copy_and_replace(
newly_joined=True
)
newly_joined_room_ids.add(room_id)
else:
prev_event_id = first_membership_change_by_room_id_in_from_to_range[
room_id
@ -720,20 +792,23 @@ class SlidingSyncRoomLists:
if prev_event_id is None:
# We found a `newly_joined` room (we are joining the room for the
# first time within the token range)
sync_room_id_set[room_id] = sync_room_id_set[
room_id
].copy_and_replace(newly_joined=True)
newly_joined_room_ids.add(room_id)
# Last resort, we need to step back to the previous membership event
# just before the token range to see if we're joined then or not.
elif prev_membership != Membership.JOIN:
# We found a `newly_joined` room (we left before the token range
# and joined within the token range)
sync_room_id_set[room_id] = sync_room_id_set[
room_id
].copy_and_replace(newly_joined=True)
newly_joined_room_ids.add(room_id)
return newly_joined_room_ids, newly_left_room_map
@trace
async def _get_dm_rooms_for_user(
self,
user_id: str,
) -> StrCollection:
"""Get the set of DM rooms for the user."""
# 4) Figure out which rooms the user considers to be direct-message (DM) rooms
#
# We're using global account data (`m.direct`) instead of checking for
# `is_direct` on membership events because that property only appears for
# the invitee membership event (doesn't show up for the inviter).
@ -755,13 +830,7 @@ class SlidingSyncRoomLists:
if isinstance(room_id, str):
dm_room_id_set.add(room_id)
# 4) Fixup
for room_id in sync_room_id_set:
sync_room_id_set[room_id] = sync_room_id_set[room_id].copy_and_replace(
is_dm=room_id in dm_room_id_set
)
return sync_room_id_set
return dm_room_id_set
@trace
async def filter_rooms_relevant_for_sync(