Add rooms.required_state to Sliding Sync /sync (#17342)

Also handles excluding rooms with partial state when people are asking for room membership events unless it's `$LAZY` room membership.
This commit is contained in:
Eric Eastwood 2024-07-04 12:25:36 -05:00 committed by GitHub
parent a9d2e40ea4
commit 22aeb78b77
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 1688 additions and 90 deletions

View File

@ -0,0 +1 @@
Return "required state" in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.

View File

@ -18,7 +18,7 @@
# #
# #
import logging import logging
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple from typing import TYPE_CHECKING, Any, Dict, Final, List, Optional, Set, Tuple
import attr import attr
from immutabledict import immutabledict from immutabledict import immutabledict
@ -39,6 +39,7 @@ from synapse.types import (
PersistedEventPosition, PersistedEventPosition,
Requester, Requester,
RoomStreamToken, RoomStreamToken,
StateMap,
StreamKeyType, StreamKeyType,
StreamToken, StreamToken,
UserID, UserID,
@ -90,14 +91,186 @@ class RoomSyncConfig:
Attributes: Attributes:
timeline_limit: The maximum number of events to return in the timeline. timeline_limit: The maximum number of events to return in the timeline.
required_state: The set of state events requested for the room. The
values are close to `StateKey` but actually use a syntax where you can required_state_map: Map from state event type to state_keys requested for the
provide `*` wildcard and `$LAZY` for lazy room members as the `state_key` part room. The values are close to `StateKey` but actually use a syntax where you
of the tuple (type, state_key). can provide `*` wildcard and `$LAZY` for lazy-loading room members.
""" """
timeline_limit: int timeline_limit: int
required_state: Set[Tuple[str, str]] required_state_map: Dict[str, Set[str]]
@classmethod
def from_room_config(
cls,
room_params: SlidingSyncConfig.CommonRoomParameters,
) -> "RoomSyncConfig":
"""
Create a `RoomSyncConfig` from a `SlidingSyncList`/`RoomSubscription` config.
Args:
room_params: `SlidingSyncConfig.SlidingSyncList` or `SlidingSyncConfig.RoomSubscription`
"""
required_state_map: Dict[str, Set[str]] = {}
for (
state_type,
state_key,
) in room_params.required_state:
# If we already have a wildcard for this specific `state_key`, we don't need
# to add it since the wildcard already covers it.
if state_key in required_state_map.get(StateValues.WILDCARD, set()):
continue
# If we already have a wildcard `state_key` for this `state_type`, we don't need
# to add anything else
if StateValues.WILDCARD in required_state_map.get(state_type, set()):
continue
# If we're getting wildcards for the `state_type` and `state_key`, that's
# all that matters so get rid of any other entries
if state_type == StateValues.WILDCARD and state_key == StateValues.WILDCARD:
required_state_map = {StateValues.WILDCARD: {StateValues.WILDCARD}}
# We can break, since we don't need to add anything else
break
# If we're getting a wildcard for the `state_type`, get rid of any other
# entries with the same `state_key`, since the wildcard will cover it already.
elif state_type == StateValues.WILDCARD:
# Get rid of any entries that match the `state_key`
#
# Make a copy so we don't run into an error: `dictionary changed size
# during iteration`, when we remove items
for (
existing_state_type,
existing_state_key_set,
) in list(required_state_map.items()):
# Make a copy so we don't run into an error: `Set changed size during
# iteration`, when we filter out and remove items
for existing_state_key in existing_state_key_set.copy():
if existing_state_key == state_key:
existing_state_key_set.remove(state_key)
# If we've the left the `set()` empty, remove it from the map
if existing_state_key_set == set():
required_state_map.pop(existing_state_type, None)
# If we're getting a wildcard `state_key`, get rid of any other state_keys
# for this `state_type` since the wildcard will cover it already.
if state_key == StateValues.WILDCARD:
required_state_map[state_type] = {state_key}
# Otherwise, just add it to the set
else:
if required_state_map.get(state_type) is None:
required_state_map[state_type] = {state_key}
else:
required_state_map[state_type].add(state_key)
return cls(
timeline_limit=room_params.timeline_limit,
required_state_map=required_state_map,
)
def deep_copy(self) -> "RoomSyncConfig":
required_state_map: Dict[str, Set[str]] = {
state_type: state_key_set.copy()
for state_type, state_key_set in self.required_state_map.items()
}
return RoomSyncConfig(
timeline_limit=self.timeline_limit,
required_state_map=required_state_map,
)
def combine_room_sync_config(
self, other_room_sync_config: "RoomSyncConfig"
) -> None:
"""
Combine this `RoomSyncConfig` with another `RoomSyncConfig` and take the
superset union of the two.
"""
# Take the highest timeline limit
if self.timeline_limit < other_room_sync_config.timeline_limit:
self.timeline_limit = other_room_sync_config.timeline_limit
# Union the required state
for (
state_type,
state_key_set,
) in other_room_sync_config.required_state_map.items():
# If we already have a wildcard for everything, we don't need to add
# anything else
if StateValues.WILDCARD in self.required_state_map.get(
StateValues.WILDCARD, set()
):
break
# If we already have a wildcard `state_key` for this `state_type`, we don't need
# to add anything else
if StateValues.WILDCARD in self.required_state_map.get(state_type, set()):
continue
# If we're getting wildcards for the `state_type` and `state_key`, that's
# all that matters so get rid of any other entries
if (
state_type == StateValues.WILDCARD
and StateValues.WILDCARD in state_key_set
):
self.required_state_map = {state_type: {StateValues.WILDCARD}}
# We can break, since we don't need to add anything else
break
for state_key in state_key_set:
# If we already have a wildcard for this specific `state_key`, we don't need
# to add it since the wildcard already covers it.
if state_key in self.required_state_map.get(
StateValues.WILDCARD, set()
):
continue
# If we're getting a wildcard for the `state_type`, get rid of any other
# entries with the same `state_key`, since the wildcard will cover it already.
if state_type == StateValues.WILDCARD:
# Get rid of any entries that match the `state_key`
#
# Make a copy so we don't run into an error: `dictionary changed size
# during iteration`, when we remove items
for existing_state_type, existing_state_key_set in list(
self.required_state_map.items()
):
# Make a copy so we don't run into an error: `Set changed size during
# iteration`, when we filter out and remove items
for existing_state_key in existing_state_key_set.copy():
if existing_state_key == state_key:
existing_state_key_set.remove(state_key)
# If we've the left the `set()` empty, remove it from the map
if existing_state_key_set == set():
self.required_state_map.pop(existing_state_type, None)
# If we're getting a wildcard `state_key`, get rid of any other state_keys
# for this `state_type` since the wildcard will cover it already.
if state_key == StateValues.WILDCARD:
self.required_state_map[state_type] = {state_key}
break
# Otherwise, just add it to the set
else:
if self.required_state_map.get(state_type) is None:
self.required_state_map[state_type] = {state_key}
else:
self.required_state_map[state_type].add(state_key)
class StateValues:
"""
Understood values of the (type, state_key) tuple in `required_state`.
"""
# Include all state events of the given type
WILDCARD: Final = "*"
# Lazy-load room membership events (include room membership events for any event
# `sender` in the timeline). We only give special meaning to this value when it's a
# `state_key`.
LAZY: Final = "$LAZY"
@attr.s(slots=True, frozen=True, auto_attribs=True) @attr.s(slots=True, frozen=True, auto_attribs=True)
@ -242,6 +415,8 @@ class SlidingSyncHandler:
# Assemble sliding window lists # Assemble sliding window lists
lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {} lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
# Keep track of the rooms that we're going to display and need to fetch more
# info about
relevant_room_map: Dict[str, RoomSyncConfig] = {} relevant_room_map: Dict[str, RoomSyncConfig] = {}
if sync_config.lists: if sync_config.lists:
# Get all of the room IDs that the user should be able to see in the sync # Get all of the room IDs that the user should be able to see in the sync
@ -260,49 +435,76 @@ class SlidingSyncHandler:
sync_config.user, sync_room_map, list_config.filters, to_token sync_config.user, sync_room_map, list_config.filters, to_token
) )
# Sort the list
sorted_room_info = await self.sort_rooms( sorted_room_info = await self.sort_rooms(
filtered_sync_room_map, to_token filtered_sync_room_map, to_token
) )
# Find which rooms are partially stated and may need to be filtered out
# depending on the `required_state` requested (see below).
partial_state_room_map = await self.store.is_partial_state_room_batched(
filtered_sync_room_map.keys()
)
# Since creating the `RoomSyncConfig` takes some work, let's just do it
# once and make a copy whenever we need it.
room_sync_config = RoomSyncConfig.from_room_config(list_config)
membership_state_keys = room_sync_config.required_state_map.get(
EventTypes.Member
)
lazy_loading = (
membership_state_keys is not None
and len(membership_state_keys) == 1
and StateValues.LAZY in membership_state_keys
)
ops: List[SlidingSyncResult.SlidingWindowList.Operation] = [] ops: List[SlidingSyncResult.SlidingWindowList.Operation] = []
if list_config.ranges: if list_config.ranges:
for range in list_config.ranges: for range in list_config.ranges:
sliced_room_ids = [ room_ids_in_list: List[str] = []
room_id
# Both sides of range are inclusive # We're going to loop through the sorted list of rooms starting
for room_id, _ in sorted_room_info[range[0] : range[1] + 1] # at the range start index and keep adding rooms until we fill
] # up the range or run out of rooms.
#
# Both sides of range are inclusive so we `+ 1`
max_num_rooms = range[1] - range[0] + 1
for room_id, _ in sorted_room_info[range[0] :]:
if len(room_ids_in_list) >= max_num_rooms:
break
# Exclude partially-stated rooms unless the `required_state`
# only has `["m.room.member", "$LAZY"]` for membership
# (lazy-loading room members).
if partial_state_room_map.get(room_id) and not lazy_loading:
continue
# Take the superset of the `RoomSyncConfig` for each room.
#
# Update our `relevant_room_map` with the room we're going
# to display and need to fetch more info about.
existing_room_sync_config = relevant_room_map.get(room_id)
if existing_room_sync_config is not None:
existing_room_sync_config.combine_room_sync_config(
room_sync_config
)
else:
# Make a copy so if we modify it later, it doesn't
# affect all references.
relevant_room_map[room_id] = (
room_sync_config.deep_copy()
)
room_ids_in_list.append(room_id)
ops.append( ops.append(
SlidingSyncResult.SlidingWindowList.Operation( SlidingSyncResult.SlidingWindowList.Operation(
op=OperationType.SYNC, op=OperationType.SYNC,
range=range, range=range,
room_ids=sliced_room_ids, room_ids=room_ids_in_list,
) )
) )
# Take the superset of the `RoomSyncConfig` for each room
for room_id in sliced_room_ids:
if relevant_room_map.get(room_id) is not None:
# Take the highest timeline limit
if (
relevant_room_map[room_id].timeline_limit
< list_config.timeline_limit
):
relevant_room_map[room_id].timeline_limit = (
list_config.timeline_limit
)
# Union the required state
relevant_room_map[room_id].required_state.update(
list_config.required_state
)
else:
relevant_room_map[room_id] = RoomSyncConfig(
timeline_limit=list_config.timeline_limit,
required_state=set(list_config.required_state),
)
lists[list_key] = SlidingSyncResult.SlidingWindowList( lists[list_key] = SlidingSyncResult.SlidingWindowList(
count=len(sorted_room_info), count=len(sorted_room_info),
ops=ops, ops=ops,
@ -651,9 +853,6 @@ class SlidingSyncHandler:
user_id = user.to_string() user_id = user.to_string()
# TODO: Apply filters # TODO: Apply filters
#
# TODO: Exclude partially stated rooms unless the `required_state` has
# `["m.room.member", "$LAZY"]`
filtered_room_id_set = set(sync_room_map.keys()) filtered_room_id_set = set(sync_room_map.keys())
@ -694,16 +893,18 @@ class SlidingSyncHandler:
if filters.is_encrypted is not None: if filters.is_encrypted is not None:
# Make a copy so we don't run into an error: `Set changed size during # Make a copy so we don't run into an error: `Set changed size during
# iteration`, when we filter out and remove items # iteration`, when we filter out and remove items
for room_id in list(filtered_room_id_set): for room_id in filtered_room_id_set.copy():
state_at_to_token = await self.storage_controllers.state.get_state_at( state_at_to_token = await self.storage_controllers.state.get_state_at(
room_id, room_id,
to_token, to_token,
state_filter=StateFilter.from_types( state_filter=StateFilter.from_types(
[(EventTypes.RoomEncryption, "")] [(EventTypes.RoomEncryption, "")]
), ),
# Partially stated rooms should have all state events except for the # Partially-stated rooms should have all state events except for the
# membership events so we don't need to wait. Plus we don't want to # membership events so we don't need to wait because we only care
# block the whole sync waiting for this one room. # about retrieving the `EventTypes.RoomEncryption` state event here.
# Plus we don't want to block the whole sync waiting for this one
# room.
await_full_state=False, await_full_state=False,
) )
is_encrypted = state_at_to_token.get((EventTypes.RoomEncryption, "")) is_encrypted = state_at_to_token.get((EventTypes.RoomEncryption, ""))
@ -719,7 +920,7 @@ class SlidingSyncHandler:
if filters.is_invite is not None: if filters.is_invite is not None:
# Make a copy so we don't run into an error: `Set changed size during # Make a copy so we don't run into an error: `Set changed size during
# iteration`, when we filter out and remove items # iteration`, when we filter out and remove items
for room_id in list(filtered_room_id_set): for room_id in filtered_room_id_set.copy():
room_for_user = sync_room_map[room_id] room_for_user = sync_room_map[room_id]
# If we're looking for invite rooms, filter out rooms that the user is # If we're looking for invite rooms, filter out rooms that the user is
# not invited to and vice versa # not invited to and vice versa
@ -737,7 +938,7 @@ class SlidingSyncHandler:
if filters.room_types is not None or filters.not_room_types is not None: if filters.room_types is not None or filters.not_room_types is not None:
# Make a copy so we don't run into an error: `Set changed size during # Make a copy so we don't run into an error: `Set changed size during
# iteration`, when we filter out and remove items # iteration`, when we filter out and remove items
for room_id in list(filtered_room_id_set): for room_id in filtered_room_id_set.copy():
create_event = await self.store.get_create_event_for_room(room_id) create_event = await self.store.get_create_event_for_room(room_id)
room_type = create_event.content.get(EventContentFields.ROOM_TYPE) room_type = create_event.content.get(EventContentFields.ROOM_TYPE)
if ( if (
@ -843,7 +1044,7 @@ class SlidingSyncHandler:
# Assemble the list of timeline events # Assemble the list of timeline events
# #
# It would be nice to make the `rooms` response more uniform regardless of # FIXME: It would be nice to make the `rooms` response more uniform regardless of
# membership. Currently, we have to make all of these optional because # membership. Currently, we have to make all of these optional because
# `invite`/`knock` rooms only have `stripped_state`. See # `invite`/`knock` rooms only have `stripped_state`. See
# https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932 # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932
@ -1010,6 +1211,136 @@ class SlidingSyncHandler:
# state reset happened. Perhaps we should indicate this by setting `initial: # state reset happened. Perhaps we should indicate this by setting `initial:
# True` and empty `required_state`. # True` and empty `required_state`.
# TODO: Since we can't determine whether we've already sent a room down this
# Sliding Sync connection before (we plan to add this optimization in the
# future), we're always returning the requested room state instead of
# updates.
initial = True
# Fetch the required state for the room
#
# No `required_state` for invite/knock rooms (just `stripped_state`)
#
# FIXME: It would be nice to make the `rooms` response more uniform regardless
# of membership. Currently, we have to make this optional because
# `invite`/`knock` rooms only have `stripped_state`. See
# https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932
room_state: Optional[StateMap[EventBase]] = None
if rooms_membership_for_user_at_to_token.membership not in (
Membership.INVITE,
Membership.KNOCK,
):
# Calculate the `StateFilter` based on the `required_state` for the room
state_filter: Optional[StateFilter] = StateFilter.none()
# If we have a double wildcard ("*", "*") in the `required_state`, we need
# to fetch all state for the room
#
# Note: MSC3575 describes different behavior to how we're handling things
# here but since it's not wrong to return more state than requested
# (`required_state` is just the minimum requested), it doesn't matter if we
# include more than client wanted. This complexity is also under scrutiny,
# see
# https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1185109050
#
# > One unique exception is when you request all state events via ["*", "*"]. When used,
# > all state events are returned by default, and additional entries FILTER OUT the returned set
# > of state events. These additional entries cannot use '*' themselves.
# > For example, ["*", "*"], ["m.room.member", "@alice:example.com"] will _exclude_ every m.room.member
# > event _except_ for @alice:example.com, and include every other state event.
# > In addition, ["*", "*"], ["m.space.child", "*"] is an error, the m.space.child filter is not
# > required as it would have been returned anyway.
# >
# > -- MSC3575 (https://github.com/matrix-org/matrix-spec-proposals/pull/3575)
if StateValues.WILDCARD in room_sync_config.required_state_map.get(
StateValues.WILDCARD, set()
):
state_filter = StateFilter.all()
# TODO: `StateFilter` currently doesn't support wildcard event types. We're
# currently working around this by returning all state to the client but it
# would be nice to fetch less from the database and return just what the
# client wanted.
elif (
room_sync_config.required_state_map.get(StateValues.WILDCARD)
is not None
):
state_filter = StateFilter.all()
else:
required_state_types: List[Tuple[str, Optional[str]]] = []
for (
state_type,
state_key_set,
) in room_sync_config.required_state_map.items():
for state_key in state_key_set:
if state_key == StateValues.WILDCARD:
# `None` is a wildcard in the `StateFilter`
required_state_types.append((state_type, None))
# We need to fetch all relevant people when we're lazy-loading membership
elif (
state_type == EventTypes.Member
and state_key == StateValues.LAZY
):
# Everyone in the timeline is relevant
timeline_membership: Set[str] = set()
if timeline_events is not None:
for timeline_event in timeline_events:
timeline_membership.add(timeline_event.sender)
for user_id in timeline_membership:
required_state_types.append(
(EventTypes.Member, user_id)
)
# FIXME: We probably also care about invite, ban, kick, targets, etc
# but the spec only mentions "senders".
else:
required_state_types.append((state_type, state_key))
state_filter = StateFilter.from_types(required_state_types)
# We can skip fetching state if we don't need any
if state_filter != StateFilter.none():
# We can return all of the state that was requested if we're doing an
# initial sync
if initial:
# People shouldn't see past their leave/ban event
if rooms_membership_for_user_at_to_token.membership in (
Membership.LEAVE,
Membership.BAN,
):
room_state = await self.storage_controllers.state.get_state_at(
room_id,
stream_position=to_token.copy_and_replace(
StreamKeyType.ROOM,
rooms_membership_for_user_at_to_token.event_pos.to_room_stream_token(),
),
state_filter=state_filter,
# Partially-stated rooms should have all state events except for
# the membership events and since we've already excluded
# partially-stated rooms unless `required_state` only has
# `["m.room.member", "$LAZY"]` for membership, we should be able
# to retrieve everything requested. Plus we don't want to block
# the whole sync waiting for this one room.
await_full_state=False,
)
# Otherwise, we can get the latest current state in the room
else:
room_state = await self.storage_controllers.state.get_current_state(
room_id,
state_filter,
# Partially-stated rooms should have all state events except for
# the membership events and since we've already excluded
# partially-stated rooms unless `required_state` only has
# `["m.room.member", "$LAZY"]` for membership, we should be able
# to retrieve everything requested. Plus we don't want to block
# the whole sync waiting for this one room.
await_full_state=False,
)
# TODO: Query `current_state_delta_stream` and reverse/rewind back to the `to_token`
else:
# TODO: Once we can figure out if we've sent a room down this connection before,
# we can return updates instead of the full required state.
raise NotImplementedError()
return SlidingSyncResult.RoomResult( return SlidingSyncResult.RoomResult(
# TODO: Dummy value # TODO: Dummy value
name=None, name=None,
@ -1017,20 +1348,16 @@ class SlidingSyncHandler:
avatar=None, avatar=None,
# TODO: Dummy value # TODO: Dummy value
heroes=None, heroes=None,
# TODO: Since we can't determine whether we've already sent a room down this
# Sliding Sync connection before (we plan to add this optimization in the
# future), we're always returning the requested room state instead of
# updates.
initial=True,
# TODO: Dummy value
required_state=[],
timeline_events=timeline_events,
bundled_aggregations=bundled_aggregations,
# TODO: Dummy value # TODO: Dummy value
is_dm=False, is_dm=False,
initial=initial,
required_state=list(room_state.values()) if room_state else None,
timeline_events=timeline_events,
bundled_aggregations=bundled_aggregations,
stripped_state=stripped_state, stripped_state=stripped_state,
prev_batch=prev_batch_token, prev_batch=prev_batch_token,
limited=limited, limited=limited,
num_live=num_live,
# TODO: Dummy values # TODO: Dummy values
joined_count=0, joined_count=0,
invited_count=0, invited_count=0,
@ -1039,5 +1366,4 @@ class SlidingSyncHandler:
# (encrypted rooms). # (encrypted rooms).
notification_count=0, notification_count=0,
highlight_count=0, highlight_count=0,
num_live=num_live,
) )

View File

@ -1352,7 +1352,7 @@ class SyncHandler:
await_full_state = True await_full_state = True
lazy_load_members = False lazy_load_members = False
state_at_timeline_end = await self._state_storage_controller.get_state_at( state_at_timeline_end = await self._state_storage_controller.get_state_ids_at(
room_id, room_id,
stream_position=end_token, stream_position=end_token,
state_filter=state_filter, state_filter=state_filter,
@ -1480,12 +1480,14 @@ class SyncHandler:
else: else:
# We can get here if the user has ignored the senders of all # We can get here if the user has ignored the senders of all
# the recent events. # the recent events.
state_at_timeline_start = await self._state_storage_controller.get_state_at( state_at_timeline_start = (
await self._state_storage_controller.get_state_ids_at(
room_id, room_id,
stream_position=end_token, stream_position=end_token,
state_filter=state_filter, state_filter=state_filter,
await_full_state=await_full_state, await_full_state=await_full_state,
) )
)
if batch.limited: if batch.limited:
# for now, we disable LL for gappy syncs - see # for now, we disable LL for gappy syncs - see
@ -1502,14 +1504,14 @@ class SyncHandler:
# about them). # about them).
state_filter = StateFilter.all() state_filter = StateFilter.all()
state_at_previous_sync = await self._state_storage_controller.get_state_at( state_at_previous_sync = await self._state_storage_controller.get_state_ids_at(
room_id, room_id,
stream_position=since_token, stream_position=since_token,
state_filter=state_filter, state_filter=state_filter,
await_full_state=await_full_state, await_full_state=await_full_state,
) )
state_at_timeline_end = await self._state_storage_controller.get_state_at( state_at_timeline_end = await self._state_storage_controller.get_state_ids_at(
room_id, room_id,
stream_position=end_token, stream_position=end_token,
state_filter=state_filter, state_filter=state_filter,
@ -2508,7 +2510,7 @@ class SyncHandler:
continue continue
if room_id in sync_result_builder.joined_room_ids or has_join: if room_id in sync_result_builder.joined_room_ids or has_join:
old_state_ids = await self._state_storage_controller.get_state_at( old_state_ids = await self._state_storage_controller.get_state_ids_at(
room_id, room_id,
since_token, since_token,
state_filter=StateFilter.from_types([(EventTypes.Member, user_id)]), state_filter=StateFilter.from_types([(EventTypes.Member, user_id)]),
@ -2539,7 +2541,7 @@ class SyncHandler:
else: else:
if not old_state_ids: if not old_state_ids:
old_state_ids = ( old_state_ids = (
await self._state_storage_controller.get_state_at( await self._state_storage_controller.get_state_ids_at(
room_id, room_id,
since_token, since_token,
state_filter=StateFilter.from_types( state_filter=StateFilter.from_types(

View File

@ -996,7 +996,7 @@ class SlidingSyncRestServlet(RestServlet):
if room_result.initial: if room_result.initial:
serialized_rooms[room_id]["initial"] = room_result.initial serialized_rooms[room_id]["initial"] = room_result.initial
# This will omitted for invite/knock rooms with `stripped_state` # This will be omitted for invite/knock rooms with `stripped_state`
if room_result.required_state is not None: if room_result.required_state is not None:
serialized_required_state = ( serialized_required_state = (
await self.event_serializer.serialize_events( await self.event_serializer.serialize_events(
@ -1007,7 +1007,7 @@ class SlidingSyncRestServlet(RestServlet):
) )
serialized_rooms[room_id]["required_state"] = serialized_required_state serialized_rooms[room_id]["required_state"] = serialized_required_state
# This will omitted for invite/knock rooms with `stripped_state` # This will be omitted for invite/knock rooms with `stripped_state`
if room_result.timeline_events is not None: if room_result.timeline_events is not None:
serialized_timeline = await self.event_serializer.serialize_events( serialized_timeline = await self.event_serializer.serialize_events(
room_result.timeline_events, room_result.timeline_events,
@ -1017,17 +1017,17 @@ class SlidingSyncRestServlet(RestServlet):
) )
serialized_rooms[room_id]["timeline"] = serialized_timeline serialized_rooms[room_id]["timeline"] = serialized_timeline
# This will omitted for invite/knock rooms with `stripped_state` # This will be omitted for invite/knock rooms with `stripped_state`
if room_result.limited is not None: if room_result.limited is not None:
serialized_rooms[room_id]["limited"] = room_result.limited serialized_rooms[room_id]["limited"] = room_result.limited
# This will omitted for invite/knock rooms with `stripped_state` # This will be omitted for invite/knock rooms with `stripped_state`
if room_result.prev_batch is not None: if room_result.prev_batch is not None:
serialized_rooms[room_id]["prev_batch"] = ( serialized_rooms[room_id]["prev_batch"] = (
await room_result.prev_batch.to_string(self.store) await room_result.prev_batch.to_string(self.store)
) )
# This will omitted for invite/knock rooms with `stripped_state` # This will be omitted for invite/knock rooms with `stripped_state`
if room_result.num_live is not None: if room_result.num_live is not None:
serialized_rooms[room_id]["num_live"] = room_result.num_live serialized_rooms[room_id]["num_live"] = room_result.num_live

View File

@ -409,7 +409,7 @@ class StateStorageController:
return state_ids return state_ids
async def get_state_at( async def get_state_ids_at(
self, self,
room_id: str, room_id: str,
stream_position: StreamToken, stream_position: StreamToken,
@ -460,6 +460,30 @@ class StateStorageController:
) )
return state return state
@trace
@tag_args
async def get_state_at(
self,
room_id: str,
stream_position: StreamToken,
state_filter: Optional[StateFilter] = None,
await_full_state: bool = True,
) -> StateMap[EventBase]:
"""Same as `get_state_ids_at` but also fetches the events"""
state_map_ids = await self.get_state_ids_at(
room_id, stream_position, state_filter, await_full_state
)
event_map = await self.stores.main.get_events(list(state_map_ids.values()))
state_map = {}
for key, event_id in state_map_ids.items():
event = event_map.get(event_id)
if event:
state_map[key] = event
return state_map
@trace @trace
@tag_args @tag_args
async def get_state_for_groups( async def get_state_for_groups(

View File

@ -156,6 +156,8 @@ class SlidingSyncResult:
avatar: Room avatar avatar: Room avatar
heroes: List of stripped membership events (containing `user_id` and optionally heroes: List of stripped membership events (containing `user_id` and optionally
`avatar_url` and `displayname`) for the users used to calculate the room name. `avatar_url` and `displayname`) for the users used to calculate the room name.
is_dm: Flag to specify whether the room is a direct-message room (most likely
between two people).
initial: Flag which is set when this is the first time the server is sending this initial: Flag which is set when this is the first time the server is sending this
data on this connection. Clients can use this flag to replace or update data on this connection. Clients can use this flag to replace or update
their local state. When there is an update, servers MUST omit this flag their local state. When there is an update, servers MUST omit this flag
@ -167,8 +169,6 @@ class SlidingSyncResult:
the timeline events above. This allows clients to show accurate reaction the timeline events above. This allows clients to show accurate reaction
counts (or edits, threads), even if some of the reaction events were skipped counts (or edits, threads), even if some of the reaction events were skipped
over in a gappy sync. over in a gappy sync.
is_dm: Flag to specify whether the room is a direct-message room (most likely
between two people).
stripped_state: Stripped state events (for rooms where the usre is stripped_state: Stripped state events (for rooms where the usre is
invited/knocked). Same as `rooms.invite.$room_id.invite_state` in sync v2, invited/knocked). Same as `rooms.invite.$room_id.invite_state` in sync v2,
absent on joined/left rooms absent on joined/left rooms
@ -176,6 +176,13 @@ class SlidingSyncResult:
`/rooms/<room_id>/messages` API to retrieve earlier messages. `/rooms/<room_id>/messages` API to retrieve earlier messages.
limited: True if their are more events than fit between the given position and now. limited: True if their are more events than fit between the given position and now.
Sync again to get more. Sync again to get more.
num_live: The number of timeline events which have just occurred and are not historical.
The last N events are 'live' and should be treated as such. This is mostly
useful to determine whether a given @mention event should make a noise or not.
Clients cannot rely solely on the absence of `initial: true` to determine live
events because if a room not in the sliding window bumps into the window because
of an @mention it will have `initial: true` yet contain a single live event
(with potentially other old events in the timeline).
joined_count: The number of users with membership of join, including the client's joined_count: The number of users with membership of join, including the client's
own user ID. (same as sync `v2 m.joined_member_count`) own user ID. (same as sync `v2 m.joined_member_count`)
invited_count: The number of users with membership of invite. (same as sync v2 invited_count: The number of users with membership of invite. (same as sync v2
@ -184,37 +191,30 @@ class SlidingSyncResult:
as sync v2) as sync v2)
highlight_count: The number of unread notifications for this room with the highlight highlight_count: The number of unread notifications for this room with the highlight
flag set. (same as sync v2) flag set. (same as sync v2)
num_live: The number of timeline events which have just occurred and are not historical.
The last N events are 'live' and should be treated as such. This is mostly
useful to determine whether a given @mention event should make a noise or not.
Clients cannot rely solely on the absence of `initial: true` to determine live
events because if a room not in the sliding window bumps into the window because
of an @mention it will have `initial: true` yet contain a single live event
(with potentially other old events in the timeline).
""" """
name: Optional[str] name: Optional[str]
avatar: Optional[str] avatar: Optional[str]
heroes: Optional[List[EventBase]] heroes: Optional[List[EventBase]]
is_dm: bool
initial: bool initial: bool
# Only optional because it won't be included for invite/knock rooms with `stripped_state` # Only optional because it won't be included for invite/knock rooms with `stripped_state`
required_state: Optional[List[EventBase]] required_state: Optional[List[EventBase]]
# Only optional because it won't be included for invite/knock rooms with `stripped_state` # Only optional because it won't be included for invite/knock rooms with `stripped_state`
timeline_events: Optional[List[EventBase]] timeline_events: Optional[List[EventBase]]
bundled_aggregations: Optional[Dict[str, "BundledAggregations"]] bundled_aggregations: Optional[Dict[str, "BundledAggregations"]]
is_dm: bool
# Optional because it's only relevant to invite/knock rooms # Optional because it's only relevant to invite/knock rooms
stripped_state: Optional[List[JsonDict]] stripped_state: Optional[List[JsonDict]]
# Only optional because it won't be included for invite/knock rooms with `stripped_state` # Only optional because it won't be included for invite/knock rooms with `stripped_state`
prev_batch: Optional[StreamToken] prev_batch: Optional[StreamToken]
# Only optional because it won't be included for invite/knock rooms with `stripped_state` # Only optional because it won't be included for invite/knock rooms with `stripped_state`
limited: Optional[bool] limited: Optional[bool]
# Only optional because it won't be included for invite/knock rooms with `stripped_state`
num_live: Optional[int]
joined_count: int joined_count: int
invited_count: int invited_count: int
notification_count: int notification_count: int
highlight_count: int highlight_count: int
# Only optional because it won't be included for invite/knock rooms with `stripped_state`
num_live: Optional[int]
@attr.s(slots=True, frozen=True, auto_attribs=True) @attr.s(slots=True, frozen=True, auto_attribs=True)
class SlidingWindowList: class SlidingWindowList:

View File

@ -18,6 +18,8 @@
# #
# #
import logging import logging
from copy import deepcopy
from typing import Optional
from unittest.mock import patch from unittest.mock import patch
from parameterized import parameterized from parameterized import parameterized
@ -33,20 +35,550 @@ from synapse.api.constants import (
RoomTypes, RoomTypes,
) )
from synapse.api.room_versions import RoomVersions from synapse.api.room_versions import RoomVersions
from synapse.handlers.sliding_sync import SlidingSyncConfig from synapse.handlers.sliding_sync import RoomSyncConfig, StateValues
from synapse.rest import admin from synapse.rest import admin
from synapse.rest.client import knock, login, room from synapse.rest.client import knock, login, room
from synapse.server import HomeServer from synapse.server import HomeServer
from synapse.storage.util.id_generators import MultiWriterIdGenerator from synapse.storage.util.id_generators import MultiWriterIdGenerator
from synapse.types import JsonDict, UserID from synapse.types import JsonDict, UserID
from synapse.types.handlers import SlidingSyncConfig
from synapse.util import Clock from synapse.util import Clock
from tests.replication._base import BaseMultiWorkerStreamTestCase from tests.replication._base import BaseMultiWorkerStreamTestCase
from tests.unittest import HomeserverTestCase from tests.unittest import HomeserverTestCase, TestCase
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class RoomSyncConfigTestCase(TestCase):
def _assert_room_config_equal(
self,
actual: RoomSyncConfig,
expected: RoomSyncConfig,
message_prefix: Optional[str] = None,
) -> None:
self.assertEqual(actual.timeline_limit, expected.timeline_limit, message_prefix)
# `self.assertEqual(...)` works fine to catch differences but the output is
# almost impossible to read because of the way it truncates the output and the
# order doesn't actually matter.
self.assertCountEqual(
actual.required_state_map, expected.required_state_map, message_prefix
)
for event_type, expected_state_keys in expected.required_state_map.items():
self.assertCountEqual(
actual.required_state_map[event_type],
expected_state_keys,
f"{message_prefix}: Mismatch for {event_type}",
)
@parameterized.expand(
[
(
"from_list_config",
"""
Test that we can convert a `SlidingSyncConfig.SlidingSyncList` to a
`RoomSyncConfig`.
""",
# Input
SlidingSyncConfig.SlidingSyncList(
timeline_limit=10,
required_state=[
(EventTypes.Name, ""),
(EventTypes.Member, "@foo"),
(EventTypes.Member, "@bar"),
(EventTypes.Member, "@baz"),
(EventTypes.CanonicalAlias, ""),
],
),
# Expected
RoomSyncConfig(
timeline_limit=10,
required_state_map={
EventTypes.Name: {""},
EventTypes.Member: {
"@foo",
"@bar",
"@baz",
},
EventTypes.CanonicalAlias: {""},
},
),
),
(
"from_room_subscription",
"""
Test that we can convert a `SlidingSyncConfig.RoomSubscription` to a
`RoomSyncConfig`.
""",
# Input
SlidingSyncConfig.RoomSubscription(
timeline_limit=10,
required_state=[
(EventTypes.Name, ""),
(EventTypes.Member, "@foo"),
(EventTypes.Member, "@bar"),
(EventTypes.Member, "@baz"),
(EventTypes.CanonicalAlias, ""),
],
),
# Expected
RoomSyncConfig(
timeline_limit=10,
required_state_map={
EventTypes.Name: {""},
EventTypes.Member: {
"@foo",
"@bar",
"@baz",
},
EventTypes.CanonicalAlias: {""},
},
),
),
(
"wildcard",
"""
Test that a wildcard (*) for both the `event_type` and `state_key` will override
all other values.
Note: MSC3575 describes different behavior to how we're handling things here but
since it's not wrong to return more state than requested (`required_state` is
just the minimum requested), it doesn't matter if we include things that the
client wanted excluded. This complexity is also under scrutiny, see
https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1185109050
> One unique exception is when you request all state events via ["*", "*"]. When used,
> all state events are returned by default, and additional entries FILTER OUT the returned set
> of state events. These additional entries cannot use '*' themselves.
> For example, ["*", "*"], ["m.room.member", "@alice:example.com"] will _exclude_ every m.room.member
> event _except_ for @alice:example.com, and include every other state event.
> In addition, ["*", "*"], ["m.space.child", "*"] is an error, the m.space.child filter is not
> required as it would have been returned anyway.
>
> -- MSC3575 (https://github.com/matrix-org/matrix-spec-proposals/pull/3575)
""",
# Input
SlidingSyncConfig.SlidingSyncList(
timeline_limit=10,
required_state=[
(EventTypes.Name, ""),
(StateValues.WILDCARD, StateValues.WILDCARD),
(EventTypes.Member, "@foo"),
(EventTypes.CanonicalAlias, ""),
],
),
# Expected
RoomSyncConfig(
timeline_limit=10,
required_state_map={
StateValues.WILDCARD: {StateValues.WILDCARD},
},
),
),
(
"wildcard_type",
"""
Test that a wildcard (*) as a `event_type` will override all other values for the
same `state_key`.
""",
# Input
SlidingSyncConfig.SlidingSyncList(
timeline_limit=10,
required_state=[
(EventTypes.Name, ""),
(StateValues.WILDCARD, ""),
(EventTypes.Member, "@foo"),
(EventTypes.CanonicalAlias, ""),
],
),
# Expected
RoomSyncConfig(
timeline_limit=10,
required_state_map={
StateValues.WILDCARD: {""},
EventTypes.Member: {"@foo"},
},
),
),
(
"multiple_wildcard_type",
"""
Test that multiple wildcard (*) as a `event_type` will override all other values
for the same `state_key`.
""",
# Input
SlidingSyncConfig.SlidingSyncList(
timeline_limit=10,
required_state=[
(EventTypes.Name, ""),
(StateValues.WILDCARD, ""),
(EventTypes.Member, "@foo"),
(StateValues.WILDCARD, "@foo"),
("org.matrix.personal_count", "@foo"),
(EventTypes.Member, "@bar"),
(EventTypes.CanonicalAlias, ""),
],
),
# Expected
RoomSyncConfig(
timeline_limit=10,
required_state_map={
StateValues.WILDCARD: {
"",
"@foo",
},
EventTypes.Member: {"@bar"},
},
),
),
(
"wildcard_state_key",
"""
Test that a wildcard (*) as a `state_key` will override all other values for the
same `event_type`.
""",
# Input
SlidingSyncConfig.SlidingSyncList(
timeline_limit=10,
required_state=[
(EventTypes.Name, ""),
(EventTypes.Member, "@foo"),
(EventTypes.Member, StateValues.WILDCARD),
(EventTypes.Member, "@bar"),
(EventTypes.Member, StateValues.LAZY),
(EventTypes.Member, "@baz"),
(EventTypes.CanonicalAlias, ""),
],
),
# Expected
RoomSyncConfig(
timeline_limit=10,
required_state_map={
EventTypes.Name: {""},
EventTypes.Member: {
StateValues.WILDCARD,
},
EventTypes.CanonicalAlias: {""},
},
),
),
(
"wildcard_merge",
"""
Test that a wildcard (*) entries for the `event_type` and another one for
`state_key` will play together.
""",
# Input
SlidingSyncConfig.SlidingSyncList(
timeline_limit=10,
required_state=[
(EventTypes.Name, ""),
(StateValues.WILDCARD, ""),
(EventTypes.Member, "@foo"),
(EventTypes.Member, StateValues.WILDCARD),
(EventTypes.Member, "@bar"),
(EventTypes.CanonicalAlias, ""),
],
),
# Expected
RoomSyncConfig(
timeline_limit=10,
required_state_map={
StateValues.WILDCARD: {""},
EventTypes.Member: {StateValues.WILDCARD},
},
),
),
(
"wildcard_merge2",
"""
Test that an all wildcard ("*", "*") entry will override any other
values (including other wildcards).
""",
# Input
SlidingSyncConfig.SlidingSyncList(
timeline_limit=10,
required_state=[
(EventTypes.Name, ""),
(StateValues.WILDCARD, ""),
(EventTypes.Member, StateValues.WILDCARD),
(EventTypes.Member, "@foo"),
# One of these should take precedence over everything else
(StateValues.WILDCARD, StateValues.WILDCARD),
(StateValues.WILDCARD, StateValues.WILDCARD),
(EventTypes.CanonicalAlias, ""),
],
),
# Expected
RoomSyncConfig(
timeline_limit=10,
required_state_map={
StateValues.WILDCARD: {StateValues.WILDCARD},
},
),
),
(
"lazy_members",
"""
`$LAZY` room members should just be another additional key next to other
explicit keys. We will unroll the special `$LAZY` meaning later.
""",
# Input
SlidingSyncConfig.SlidingSyncList(
timeline_limit=10,
required_state=[
(EventTypes.Name, ""),
(EventTypes.Member, "@foo"),
(EventTypes.Member, "@bar"),
(EventTypes.Member, StateValues.LAZY),
(EventTypes.Member, "@baz"),
(EventTypes.CanonicalAlias, ""),
],
),
# Expected
RoomSyncConfig(
timeline_limit=10,
required_state_map={
EventTypes.Name: {""},
EventTypes.Member: {
"@foo",
"@bar",
StateValues.LAZY,
"@baz",
},
EventTypes.CanonicalAlias: {""},
},
),
),
]
)
def test_from_room_config(
self,
_test_label: str,
_test_description: str,
room_params: SlidingSyncConfig.CommonRoomParameters,
expected_room_sync_config: RoomSyncConfig,
) -> None:
"""
Test `RoomSyncConfig.from_room_config(room_params)` will result in the `expected_room_sync_config`.
"""
room_sync_config = RoomSyncConfig.from_room_config(room_params)
self._assert_room_config_equal(
room_sync_config,
expected_room_sync_config,
)
@parameterized.expand(
[
(
"no_direct_overlap",
# A
RoomSyncConfig(
timeline_limit=9,
required_state_map={
EventTypes.Name: {""},
EventTypes.Member: {
"@foo",
"@bar",
},
},
),
# B
RoomSyncConfig(
timeline_limit=10,
required_state_map={
EventTypes.Member: {
StateValues.LAZY,
"@baz",
},
EventTypes.CanonicalAlias: {""},
},
),
# Expected
RoomSyncConfig(
timeline_limit=10,
required_state_map={
EventTypes.Name: {""},
EventTypes.Member: {
"@foo",
"@bar",
StateValues.LAZY,
"@baz",
},
EventTypes.CanonicalAlias: {""},
},
),
),
(
"wildcard_overlap",
# A
RoomSyncConfig(
timeline_limit=10,
required_state_map={
StateValues.WILDCARD: {StateValues.WILDCARD},
},
),
# B
RoomSyncConfig(
timeline_limit=9,
required_state_map={
EventTypes.Dummy: {StateValues.WILDCARD},
StateValues.WILDCARD: {"@bar"},
EventTypes.Member: {"@foo"},
},
),
# Expected
RoomSyncConfig(
timeline_limit=10,
required_state_map={
StateValues.WILDCARD: {StateValues.WILDCARD},
},
),
),
(
"state_type_wildcard_overlap",
# A
RoomSyncConfig(
timeline_limit=10,
required_state_map={
EventTypes.Dummy: {"dummy"},
StateValues.WILDCARD: {
"",
"@foo",
},
EventTypes.Member: {"@bar"},
},
),
# B
RoomSyncConfig(
timeline_limit=9,
required_state_map={
EventTypes.Dummy: {"dummy2"},
StateValues.WILDCARD: {
"",
"@bar",
},
EventTypes.Member: {"@foo"},
},
),
# Expected
RoomSyncConfig(
timeline_limit=10,
required_state_map={
EventTypes.Dummy: {
"dummy",
"dummy2",
},
StateValues.WILDCARD: {
"",
"@foo",
"@bar",
},
},
),
),
(
"state_key_wildcard_overlap",
# A
RoomSyncConfig(
timeline_limit=10,
required_state_map={
EventTypes.Dummy: {"dummy"},
EventTypes.Member: {StateValues.WILDCARD},
"org.matrix.flowers": {StateValues.WILDCARD},
},
),
# B
RoomSyncConfig(
timeline_limit=9,
required_state_map={
EventTypes.Dummy: {StateValues.WILDCARD},
EventTypes.Member: {StateValues.WILDCARD},
"org.matrix.flowers": {"tulips"},
},
),
# Expected
RoomSyncConfig(
timeline_limit=10,
required_state_map={
EventTypes.Dummy: {StateValues.WILDCARD},
EventTypes.Member: {StateValues.WILDCARD},
"org.matrix.flowers": {StateValues.WILDCARD},
},
),
),
(
"state_type_and_state_key_wildcard_merge",
# A
RoomSyncConfig(
timeline_limit=10,
required_state_map={
EventTypes.Dummy: {"dummy"},
StateValues.WILDCARD: {
"",
"@foo",
},
EventTypes.Member: {"@bar"},
},
),
# B
RoomSyncConfig(
timeline_limit=9,
required_state_map={
EventTypes.Dummy: {"dummy2"},
StateValues.WILDCARD: {""},
EventTypes.Member: {StateValues.WILDCARD},
},
),
# Expected
RoomSyncConfig(
timeline_limit=10,
required_state_map={
EventTypes.Dummy: {
"dummy",
"dummy2",
},
StateValues.WILDCARD: {
"",
"@foo",
},
EventTypes.Member: {StateValues.WILDCARD},
},
),
),
]
)
def test_combine_room_sync_config(
self,
_test_label: str,
a: RoomSyncConfig,
b: RoomSyncConfig,
expected: RoomSyncConfig,
) -> None:
"""
Combine A into B and B into A to make sure we get the same result.
"""
# Since we're mutating these in place, make a copy for each of our trials
room_sync_config_a = deepcopy(a)
room_sync_config_b = deepcopy(b)
# Combine B into A
room_sync_config_a.combine_room_sync_config(room_sync_config_b)
self._assert_room_config_equal(room_sync_config_a, expected, "B into A")
# Since we're mutating these in place, make a copy for each of our trials
room_sync_config_a = deepcopy(a)
room_sync_config_b = deepcopy(b)
# Combine A into B
room_sync_config_b.combine_room_sync_config(room_sync_config_a)
self._assert_room_config_equal(room_sync_config_b, expected, "A into B")
class GetSyncRoomIdsForUserTestCase(HomeserverTestCase): class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
""" """
Tests Sliding Sync handler `get_sync_room_ids_for_user()` to make sure it returns Tests Sliding Sync handler `get_sync_room_ids_for_user()` to make sure it returns

View File

@ -20,7 +20,7 @@
# #
import json import json
import logging import logging
from typing import Dict, List from typing import AbstractSet, Any, Dict, Iterable, List, Optional
from parameterized import parameterized, parameterized_class from parameterized import parameterized, parameterized_class
@ -32,9 +32,12 @@ from synapse.api.constants import (
EventContentFields, EventContentFields,
EventTypes, EventTypes,
HistoryVisibility, HistoryVisibility,
Membership,
ReceiptTypes, ReceiptTypes,
RelationTypes, RelationTypes,
) )
from synapse.events import EventBase
from synapse.handlers.sliding_sync import StateValues
from synapse.rest.client import devices, knock, login, read_marker, receipts, room, sync from synapse.rest.client import devices, knock, login, read_marker, receipts, room, sync
from synapse.server import HomeServer from synapse.server import HomeServer
from synapse.types import JsonDict, RoomStreamToken, StreamKeyType, StreamToken, UserID from synapse.types import JsonDict, RoomStreamToken, StreamKeyType, StreamToken, UserID
@ -45,6 +48,7 @@ from tests.federation.transport.test_knocking import (
KnockingStrippedStateEventHelperMixin, KnockingStrippedStateEventHelperMixin,
) )
from tests.server import TimedOutException from tests.server import TimedOutException
from tests.test_utils.event_injection import mark_event_as_partial_state
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -1237,6 +1241,94 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
) )
self.store = hs.get_datastores().main self.store = hs.get_datastores().main
self.event_sources = hs.get_event_sources() self.event_sources = hs.get_event_sources()
self.storage_controllers = hs.get_storage_controllers()
def _assertRequiredStateIncludes(
self,
actual_required_state: Any,
expected_state_events: Iterable[EventBase],
exact: bool = False,
) -> None:
"""
Wrapper around `_assertIncludes` to give slightly better looking diff error
messages that include some context "$event_id (type, state_key)".
Args:
actual_required_state: The "required_state" of a room from a Sliding Sync
request response.
expected_state_events: The expected state events to be included in the
`actual_required_state`.
exact: Whether the actual state should be exactly equal to the expected
state (no extras).
"""
assert isinstance(actual_required_state, list)
for event in actual_required_state:
assert isinstance(event, dict)
self._assertIncludes(
{
f'{event["event_id"]} ("{event["type"]}", "{event["state_key"]}")'
for event in actual_required_state
},
{
f'{event.event_id} ("{event.type}", "{event.state_key}")'
for event in expected_state_events
},
exact=exact,
# Message to help understand the diff in context
message=str(actual_required_state),
)
def _assertIncludes(
self,
actual_items: AbstractSet[str],
expected_items: AbstractSet[str],
exact: bool = False,
message: Optional[str] = None,
) -> None:
"""
Assert that all of the `expected_items` are included in the `actual_items`.
This assert could also be called `assertContains`, `assertItemsInSet`
Args:
actual_items: The container
expected_items: The items to check for in the container
exact: Whether the actual state should be exactly equal to the expected
state (no extras).
message: Optional message to include in the failure message.
"""
# Check that each set has the same items
if exact and actual_items == expected_items:
return
# Check for a superset
elif not exact and actual_items >= expected_items:
return
expected_lines: List[str] = []
for expected_item in expected_items:
is_expected_in_actual = expected_item in actual_items
expected_lines.append(
"{} {}".format(" " if is_expected_in_actual else "?", expected_item)
)
actual_lines: List[str] = []
for actual_item in actual_items:
is_actual_in_expected = actual_item in expected_items
actual_lines.append(
"{} {}".format("+" if is_actual_in_expected else " ", actual_item)
)
newline = "\n"
expected_string = f"Expected items to be in actual ('?' = missing expected items):\n {{\n{newline.join(expected_lines)}\n }}"
actual_string = f"Actual ('+' = found expected items):\n {{\n{newline.join(actual_lines)}\n }}"
first_message = (
"Items must match exactly" if exact else "Some expected items are missing."
)
diff_message = f"{first_message}\n{expected_string}\n{actual_string}"
self.fail(f"{diff_message}\n{message}")
def _add_new_dm_to_global_account_data( def _add_new_dm_to_global_account_data(
self, source_user_id: str, target_user_id: str, target_room_id: str self, source_user_id: str, target_user_id: str, target_room_id: str
@ -2091,6 +2183,11 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
channel.json_body["rooms"][room_id1].get("prev_batch"), channel.json_body["rooms"][room_id1].get("prev_batch"),
channel.json_body["rooms"][room_id1], channel.json_body["rooms"][room_id1],
) )
# `required_state` is omitted for `invite` rooms with `stripped_state`
self.assertIsNone(
channel.json_body["rooms"][room_id1].get("required_state"),
channel.json_body["rooms"][room_id1],
)
# We should have some `stripped_state` so the potential joiner can identify the # We should have some `stripped_state` so the potential joiner can identify the
# room (we don't care about the order). # room (we don't care about the order).
self.assertCountEqual( self.assertCountEqual(
@ -2200,6 +2297,11 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
channel.json_body["rooms"][room_id1].get("prev_batch"), channel.json_body["rooms"][room_id1].get("prev_batch"),
channel.json_body["rooms"][room_id1], channel.json_body["rooms"][room_id1],
) )
# `required_state` is omitted for `invite` rooms with `stripped_state`
self.assertIsNone(
channel.json_body["rooms"][room_id1].get("required_state"),
channel.json_body["rooms"][room_id1],
)
# We should have some `stripped_state` so the potential joiner can identify the # We should have some `stripped_state` so the potential joiner can identify the
# room (we don't care about the order). # room (we don't care about the order).
self.assertCountEqual( self.assertCountEqual(
@ -2321,6 +2423,11 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
channel.json_body["rooms"][room_id1].get("prev_batch"), channel.json_body["rooms"][room_id1].get("prev_batch"),
channel.json_body["rooms"][room_id1], channel.json_body["rooms"][room_id1],
) )
# `required_state` is omitted for `invite` rooms with `stripped_state`
self.assertIsNone(
channel.json_body["rooms"][room_id1].get("required_state"),
channel.json_body["rooms"][room_id1],
)
# We should have some `stripped_state` so the potential joiner can identify the # We should have some `stripped_state` so the potential joiner can identify the
# room (we don't care about the order). # room (we don't care about the order).
self.assertCountEqual( self.assertCountEqual(
@ -2448,6 +2555,11 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
channel.json_body["rooms"][room_id1].get("prev_batch"), channel.json_body["rooms"][room_id1].get("prev_batch"),
channel.json_body["rooms"][room_id1], channel.json_body["rooms"][room_id1],
) )
# `required_state` is omitted for `invite` rooms with `stripped_state`
self.assertIsNone(
channel.json_body["rooms"][room_id1].get("required_state"),
channel.json_body["rooms"][room_id1],
)
# We should have some `stripped_state` so the potential joiner can identify the # We should have some `stripped_state` so the potential joiner can identify the
# room (we don't care about the order). # room (we don't care about the order).
self.assertCountEqual( self.assertCountEqual(
@ -2681,3 +2793,602 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
False, False,
channel.json_body["rooms"][room_id1], channel.json_body["rooms"][room_id1],
) )
def test_rooms_no_required_state(self) -> None:
"""
Empty `rooms.required_state` should not return any state events in the room
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
self.helper.join(room_id1, user1_id, tok=user1_tok)
# Make the Sliding Sync request
channel = self.make_request(
"POST",
self.sync_endpoint,
{
"lists": {
"foo-list": {
"ranges": [[0, 1]],
# Empty `required_state`
"required_state": [],
"timeline_limit": 0,
}
}
},
access_token=user1_tok,
)
self.assertEqual(channel.code, 200, channel.json_body)
# No `required_state` in response
self.assertIsNone(
channel.json_body["rooms"][room_id1].get("required_state"),
channel.json_body["rooms"][room_id1],
)
def test_rooms_required_state_initial_sync(self) -> None:
"""
Test `rooms.required_state` returns requested state events in the room during an
initial sync.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
self.helper.join(room_id1, user1_id, tok=user1_tok)
# Make the Sliding Sync request
channel = self.make_request(
"POST",
self.sync_endpoint,
{
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [
[EventTypes.Create, ""],
[EventTypes.RoomHistoryVisibility, ""],
# This one doesn't exist in the room
[EventTypes.Tombstone, ""],
],
"timeline_limit": 0,
}
}
},
access_token=user1_tok,
)
self.assertEqual(channel.code, 200, channel.json_body)
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id1)
)
self._assertRequiredStateIncludes(
channel.json_body["rooms"][room_id1]["required_state"],
{
state_map[(EventTypes.Create, "")],
state_map[(EventTypes.RoomHistoryVisibility, "")],
},
exact=True,
)
def test_rooms_required_state_incremental_sync(self) -> None:
"""
Test `rooms.required_state` returns requested state events in the room during an
incremental sync.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
self.helper.join(room_id1, user1_id, tok=user1_tok)
after_room_token = self.event_sources.get_current_token()
# Make the Sliding Sync request
channel = self.make_request(
"POST",
self.sync_endpoint
+ f"?pos={self.get_success(after_room_token.to_string(self.store))}",
{
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [
[EventTypes.Create, ""],
[EventTypes.RoomHistoryVisibility, ""],
# This one doesn't exist in the room
[EventTypes.Tombstone, ""],
],
"timeline_limit": 0,
}
}
},
access_token=user1_tok,
)
self.assertEqual(channel.code, 200, channel.json_body)
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id1)
)
# The returned state doesn't change from initial to incremental sync. In the
# future, we will only return updates but only if we've sent the room down the
# connection before.
self._assertRequiredStateIncludes(
channel.json_body["rooms"][room_id1]["required_state"],
{
state_map[(EventTypes.Create, "")],
state_map[(EventTypes.RoomHistoryVisibility, "")],
},
exact=True,
)
def test_rooms_required_state_wildcard(self) -> None:
"""
Test `rooms.required_state` returns all state events when using wildcard `["*", "*"]`.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
self.helper.join(room_id1, user1_id, tok=user1_tok)
self.helper.send_state(
room_id1,
event_type="org.matrix.foo_state",
state_key="",
body={"foo": "bar"},
tok=user2_tok,
)
self.helper.send_state(
room_id1,
event_type="org.matrix.foo_state",
state_key="namespaced",
body={"foo": "bar"},
tok=user2_tok,
)
# Make the Sliding Sync request with wildcards for the `event_type` and `state_key`
channel = self.make_request(
"POST",
self.sync_endpoint,
{
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [
[StateValues.WILDCARD, StateValues.WILDCARD],
],
"timeline_limit": 0,
}
}
},
access_token=user1_tok,
)
self.assertEqual(channel.code, 200, channel.json_body)
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id1)
)
self._assertRequiredStateIncludes(
channel.json_body["rooms"][room_id1]["required_state"],
# We should see all the state events in the room
state_map.values(),
exact=True,
)
def test_rooms_required_state_wildcard_event_type(self) -> None:
"""
Test `rooms.required_state` returns relevant state events when using wildcard in
the event_type `["*", "foobarbaz"]`.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
self.helper.join(room_id1, user1_id, tok=user1_tok)
self.helper.send_state(
room_id1,
event_type="org.matrix.foo_state",
state_key="",
body={"foo": "bar"},
tok=user2_tok,
)
self.helper.send_state(
room_id1,
event_type="org.matrix.foo_state",
state_key=user2_id,
body={"foo": "bar"},
tok=user2_tok,
)
# Make the Sliding Sync request with wildcards for the `event_type`
channel = self.make_request(
"POST",
self.sync_endpoint,
{
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [
[StateValues.WILDCARD, user2_id],
],
"timeline_limit": 0,
}
}
},
access_token=user1_tok,
)
self.assertEqual(channel.code, 200, channel.json_body)
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id1)
)
# We expect at-least any state event with the `user2_id` as the `state_key`
self._assertRequiredStateIncludes(
channel.json_body["rooms"][room_id1]["required_state"],
{
state_map[(EventTypes.Member, user2_id)],
state_map[("org.matrix.foo_state", user2_id)],
},
# Ideally, this would be exact but we're currently returning all state
# events when the `event_type` is a wildcard.
exact=False,
)
def test_rooms_required_state_wildcard_state_key(self) -> None:
"""
Test `rooms.required_state` returns relevant state events when using wildcard in
the state_key `["foobarbaz","*"]`.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
self.helper.join(room_id1, user1_id, tok=user1_tok)
# Make the Sliding Sync request with wildcards for the `state_key`
channel = self.make_request(
"POST",
self.sync_endpoint,
{
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [
[EventTypes.Member, StateValues.WILDCARD],
],
"timeline_limit": 0,
}
}
},
access_token=user1_tok,
)
self.assertEqual(channel.code, 200, channel.json_body)
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id1)
)
self._assertRequiredStateIncludes(
channel.json_body["rooms"][room_id1]["required_state"],
{
state_map[(EventTypes.Member, user1_id)],
state_map[(EventTypes.Member, user2_id)],
},
exact=True,
)
def test_rooms_required_state_lazy_loading_room_members(self) -> None:
"""
Test `rooms.required_state` returns people relevant to the timeline when
lazy-loading room members, `["m.room.member","$LAZY"]`.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
user3_id = self.register_user("user3", "pass")
user3_tok = self.login(user3_id, "pass")
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
self.helper.join(room_id1, user1_id, tok=user1_tok)
self.helper.join(room_id1, user3_id, tok=user3_tok)
self.helper.send(room_id1, "1", tok=user2_tok)
self.helper.send(room_id1, "2", tok=user3_tok)
self.helper.send(room_id1, "3", tok=user2_tok)
# Make the Sliding Sync request with lazy loading for the room members
channel = self.make_request(
"POST",
self.sync_endpoint,
{
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [
[EventTypes.Create, ""],
[EventTypes.Member, StateValues.LAZY],
],
"timeline_limit": 3,
}
}
},
access_token=user1_tok,
)
self.assertEqual(channel.code, 200, channel.json_body)
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id1)
)
# Only user2 and user3 sent events in the 3 events we see in the `timeline`
self._assertRequiredStateIncludes(
channel.json_body["rooms"][room_id1]["required_state"],
{
state_map[(EventTypes.Create, "")],
state_map[(EventTypes.Member, user2_id)],
state_map[(EventTypes.Member, user3_id)],
},
exact=True,
)
@parameterized.expand([(Membership.LEAVE,), (Membership.BAN,)])
def test_rooms_required_state_leave_ban(self, stop_membership: str) -> None:
"""
Test `rooms.required_state` should not return state past a leave/ban event.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
user3_id = self.register_user("user3", "pass")
user3_tok = self.login(user3_id, "pass")
from_token = self.event_sources.get_current_token()
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
self.helper.join(room_id1, user1_id, tok=user1_tok)
self.helper.join(room_id1, user3_id, tok=user3_tok)
self.helper.send_state(
room_id1,
event_type="org.matrix.foo_state",
state_key="",
body={"foo": "bar"},
tok=user2_tok,
)
if stop_membership == Membership.LEAVE:
# User 1 leaves
self.helper.leave(room_id1, user1_id, tok=user1_tok)
elif stop_membership == Membership.BAN:
# User 1 is banned
self.helper.ban(room_id1, src=user2_id, targ=user1_id, tok=user2_tok)
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id1)
)
# Change the state after user 1 leaves
self.helper.send_state(
room_id1,
event_type="org.matrix.foo_state",
state_key="",
body={"foo": "qux"},
tok=user2_tok,
)
self.helper.leave(room_id1, user3_id, tok=user3_tok)
# Make the Sliding Sync request with lazy loading for the room members
channel = self.make_request(
"POST",
self.sync_endpoint
+ f"?pos={self.get_success(from_token.to_string(self.store))}",
{
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [
[EventTypes.Create, ""],
[EventTypes.Member, "*"],
["org.matrix.foo_state", ""],
],
"timeline_limit": 3,
}
}
},
access_token=user1_tok,
)
self.assertEqual(channel.code, 200, channel.json_body)
# Only user2 and user3 sent events in the 3 events we see in the `timeline`
self._assertRequiredStateIncludes(
channel.json_body["rooms"][room_id1]["required_state"],
{
state_map[(EventTypes.Create, "")],
state_map[(EventTypes.Member, user1_id)],
state_map[(EventTypes.Member, user2_id)],
state_map[(EventTypes.Member, user3_id)],
state_map[("org.matrix.foo_state", "")],
},
exact=True,
)
def test_rooms_required_state_combine_superset(self) -> None:
"""
Test `rooms.required_state` is combined across lists and room subscriptions.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
self.helper.join(room_id1, user1_id, tok=user1_tok)
self.helper.send_state(
room_id1,
event_type="org.matrix.foo_state",
state_key="",
body={"foo": "bar"},
tok=user2_tok,
)
# Make the Sliding Sync request with wildcards for the `state_key`
channel = self.make_request(
"POST",
self.sync_endpoint,
{
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [
[EventTypes.Create, ""],
[EventTypes.Member, user1_id],
],
"timeline_limit": 0,
},
"bar-list": {
"ranges": [[0, 1]],
"required_state": [
[EventTypes.Member, StateValues.WILDCARD],
["org.matrix.foo_state", ""],
],
"timeline_limit": 0,
},
}
# TODO: Room subscription should also combine with the `required_state`
# "room_subscriptions": {
# room_id1: {
# "required_state": [
# ["org.matrix.bar_state", ""]
# ],
# "timeline_limit": 0,
# }
# }
},
access_token=user1_tok,
)
self.assertEqual(channel.code, 200, channel.json_body)
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id1)
)
self._assertRequiredStateIncludes(
channel.json_body["rooms"][room_id1]["required_state"],
{
state_map[(EventTypes.Create, "")],
state_map[(EventTypes.Member, user1_id)],
state_map[(EventTypes.Member, user2_id)],
state_map[("org.matrix.foo_state", "")],
},
exact=True,
)
def test_rooms_required_state_partial_state(self) -> None:
"""
Test partially-stated room are excluded unless `rooms.required_state` is
lazy-loading room members.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok)
_join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok)
join_response2 = self.helper.join(room_id2, user1_id, tok=user1_tok)
# Mark room2 as partial state
self.get_success(
mark_event_as_partial_state(self.hs, join_response2["event_id"], room_id2)
)
# Make the Sliding Sync request (NOT lazy-loading room members)
channel = self.make_request(
"POST",
self.sync_endpoint,
{
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [
[EventTypes.Create, ""],
],
"timeline_limit": 0,
},
}
},
access_token=user1_tok,
)
self.assertEqual(channel.code, 200, channel.json_body)
# Make sure the list includes room1 but room2 is excluded because it's still
# partially-stated
self.assertListEqual(
list(channel.json_body["lists"]["foo-list"]["ops"]),
[
{
"op": "SYNC",
"range": [0, 1],
"room_ids": [room_id1],
}
],
channel.json_body["lists"]["foo-list"],
)
# Make the Sliding Sync request (with lazy-loading room members)
channel = self.make_request(
"POST",
self.sync_endpoint,
{
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [
[EventTypes.Create, ""],
# Lazy-load room members
[EventTypes.Member, StateValues.LAZY],
],
"timeline_limit": 0,
},
}
},
access_token=user1_tok,
)
self.assertEqual(channel.code, 200, channel.json_body)
# The list should include both rooms now because we're lazy-loading room members
self.assertListEqual(
list(channel.json_body["lists"]["foo-list"]["ops"]),
[
{
"op": "SYNC",
"range": [0, 1],
"room_ids": [room_id2, room_id1],
}
],
channel.json_body["lists"]["foo-list"],
)

View File

@ -125,13 +125,15 @@ async def mark_event_as_partial_state(
in this table). in this table).
""" """
store = hs.get_datastores().main store = hs.get_datastores().main
await store.db_pool.simple_upsert( # Use the store helper to insert into the database so the caches are busted
table="partial_state_rooms", await store.store_partial_state_room(
keyvalues={"room_id": room_id}, room_id=room_id,
values={}, servers={hs.hostname},
insertion_values={"room_id": room_id}, device_lists_stream_id=0,
joined_via=hs.hostname,
) )
# FIXME: Bust the cache
await store.db_pool.simple_insert( await store.db_pool.simple_insert(
table="partial_state_events", table="partial_state_events",
values={ values={