Add room subscriptions to Sliding Sync /sync (#17432)

Add room subscriptions to Sliding Sync `/sync`

Based on
[MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575):
Sliding Sync

Currently, you can only subscribe to rooms you have had *any* membership
in before.

In the future, we will allow `world_readable` rooms to be subscribed to
without joining.
This commit is contained in:
Eric Eastwood 2024-07-15 04:37:10 -05:00 committed by GitHub
parent fb66e938b2
commit ab62aa09da
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 1489 additions and 358 deletions

View File

@ -0,0 +1 @@
Add room subscriptions to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.

View File

@ -62,32 +62,79 @@ DEFAULT_BUMP_EVENT_TYPES = {
}
@attr.s(slots=True, frozen=True, auto_attribs=True)
class _RoomMembershipForUser:
"""
Attributes:
room_id: The room ID of the membership event
event_id: The event ID of the membership event
event_pos: The stream position of the membership event
membership: The membership state of the user in the room
sender: The person who sent the membership event
newly_joined: Whether the user newly joined the room during the given token
range and is still joined to the room at the end of this range.
newly_left: Whether the user newly left (or kicked) the room during the given
token range and is still "leave" at the end of this range.
is_dm: Whether this user considers this room as a direct-message (DM) room
"""
room_id: str
# Optional because state resets can affect room membership without a corresponding event.
event_id: Optional[str]
# Even during a state reset which removes the user from the room, we expect this to
# be set because `current_state_delta_stream` will note the position that the reset
# happened.
event_pos: PersistedEventPosition
# Even during a state reset which removes the user from the room, we expect this to
# be set to `LEAVE` because we can make that assumption based on the situaton (see
# `get_current_state_delta_membership_changes_for_user(...)`)
membership: str
# Optional because state resets can affect room membership without a corresponding event.
sender: Optional[str]
newly_joined: bool
newly_left: bool
is_dm: bool
def copy_and_replace(self, **kwds: Any) -> "_RoomMembershipForUser":
return attr.evolve(self, **kwds)
def filter_membership_for_sync(
*, membership: str, user_id: str, sender: Optional[str]
*, user_id: str, room_membership_for_user: _RoomMembershipForUser
) -> bool:
"""
Returns True if the membership event should be included in the sync response,
otherwise False.
Attributes:
membership: The membership state of the user in the room.
user_id: The user ID that the membership applies to
sender: The person who sent the membership event
room_membership_for_user: Membership information for the user in the room
"""
# Everything except `Membership.LEAVE` because we want everything that's *still*
# relevant to the user. There are few more things to include in the sync response
# (newly_left) but those are handled separately.
membership = room_membership_for_user.membership
sender = room_membership_for_user.sender
newly_left = room_membership_for_user.newly_left
# We want to allow everything except rooms the user has left unless `newly_left`
# because we want everything that's *still* relevant to the user. We include
# `newly_left` rooms because the last event that the user should see is their own
# leave event.
#
# This logic includes kicks (leave events where the sender is not the same user) and
# can be read as "anything that isn't a leave or a leave with a different sender".
# A leave != kick. This logic includes kicks (leave events where the sender is not
# the same user).
#
# When `sender=None` and `membership=Membership.LEAVE`, it means that a state reset
# happened that removed the user from the room, or the user was the last person
# locally to leave the room which caused the server to leave the room. In both
# cases, we can just remove the rooms since they are no longer relevant to the user.
# They could still be added back later if they are `newly_left`.
return membership != Membership.LEAVE or sender not in (user_id, None)
# When `sender=None`, it means that a state reset happened that removed the user
# from the room without a corresponding leave event. We can just remove the rooms
# since they are no longer relevant to the user but will still appear if they are
# `newly_left`.
return (
# Anything except leave events
membership != Membership.LEAVE
# Unless...
or newly_left
# Allow kicks
or (membership == Membership.LEAVE and sender not in (user_id, None))
)
# We can't freeze this class because we want to update it in place with the
@ -281,31 +328,6 @@ class StateValues:
LAZY: Final = "$LAZY"
@attr.s(slots=True, frozen=True, auto_attribs=True)
class _RoomMembershipForUser:
"""
Attributes:
event_id: The event ID of the membership event
event_pos: The stream position of the membership event
membership: The membership state of the user in the room
sender: The person who sent the membership event
newly_joined: Whether the user newly joined the room during the given token
range
is_dm: Whether this user considers this room as a direct-message (DM) room
"""
room_id: str
event_id: Optional[str]
event_pos: PersistedEventPosition
membership: str
sender: Optional[str]
newly_joined: bool
is_dm: bool
def copy_and_replace(self, **kwds: Any) -> "_RoomMembershipForUser":
return attr.evolve(self, **kwds)
class SlidingSyncHandler:
def __init__(self, hs: "HomeServer"):
self.clock = hs.get_clock()
@ -424,18 +446,31 @@ class SlidingSyncHandler:
# See https://github.com/matrix-org/matrix-doc/issues/1144
raise NotImplementedError()
# Get all of the room IDs that the user should be able to see in the sync
# response
has_lists = sync_config.lists is not None and len(sync_config.lists) > 0
has_room_subscriptions = (
sync_config.room_subscriptions is not None
and len(sync_config.room_subscriptions) > 0
)
if has_lists or has_room_subscriptions:
room_membership_for_user_map = (
await self.get_room_membership_for_user_at_to_token(
user=sync_config.user,
to_token=to_token,
from_token=from_token,
)
)
# Assemble sliding window lists
lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
# Keep track of the rooms that we're going to display and need to fetch more
# info about
relevant_room_map: Dict[str, RoomSyncConfig] = {}
if sync_config.lists:
# Get all of the room IDs that the user should be able to see in the sync
# response
sync_room_map = await self.get_sync_room_ids_for_user(
sync_config.user,
from_token=from_token,
to_token=to_token,
if has_lists and sync_config.lists is not None:
sync_room_map = await self.filter_rooms_relevant_for_sync(
user=sync_config.user,
room_membership_for_user_map=room_membership_for_user_map,
)
for list_key, list_config in sync_config.lists.items():
@ -524,7 +559,35 @@ class SlidingSyncHandler:
ops=ops,
)
# TODO: if (sync_config.room_subscriptions):
# Handle room subscriptions
if has_room_subscriptions and sync_config.room_subscriptions is not None:
for room_id, room_subscription in sync_config.room_subscriptions.items():
room_membership_for_user_at_to_token = (
await self.check_room_subscription_allowed_for_user(
room_id=room_id,
room_membership_for_user_map=room_membership_for_user_map,
to_token=to_token,
)
)
# Skip this room if the user isn't allowed to see it
if not room_membership_for_user_at_to_token:
continue
room_membership_for_user_map[room_id] = (
room_membership_for_user_at_to_token
)
# Take the superset of the `RoomSyncConfig` for each room.
#
# Update our `relevant_room_map` with the room we're going to display
# and need to fetch more info about.
room_sync_config = RoomSyncConfig.from_room_config(room_subscription)
existing_room_sync_config = relevant_room_map.get(room_id)
if existing_room_sync_config is not None:
existing_room_sync_config.combine_room_sync_config(room_sync_config)
else:
relevant_room_map[room_id] = room_sync_config
# Fetch room data
rooms: Dict[str, SlidingSyncResult.RoomResult] = {}
@ -533,7 +596,9 @@ class SlidingSyncHandler:
user=sync_config.user,
room_id=room_id,
room_sync_config=room_sync_config,
room_membership_for_user_at_to_token=sync_room_map[room_id],
room_membership_for_user_at_to_token=room_membership_for_user_map[
room_id
],
from_token=from_token,
to_token=to_token,
)
@ -551,28 +616,23 @@ class SlidingSyncHandler:
extensions=extensions,
)
async def get_sync_room_ids_for_user(
async def get_room_membership_for_user_at_to_token(
self,
user: UserID,
to_token: StreamToken,
from_token: Optional[StreamToken] = None,
from_token: Optional[StreamToken],
) -> Dict[str, _RoomMembershipForUser]:
"""
Fetch room IDs that should be listed for this user in the sync response (the
full room list that will be filtered, sorted, and sliced).
Fetch room IDs that the user has had membership in (the full room list including
long-lost left rooms that will be filtered, sorted, and sliced).
We're looking for rooms where the user has the following state in the token
range (> `from_token` and <= `to_token`):
We're looking for rooms where the user has had any sort of membership in the
token range (> `from_token` and <= `to_token`)
- `invite`, `join`, `knock`, `ban` membership events
- Kicks (`leave` membership events where `sender` is different from the
`user_id`/`state_key`)
- `newly_left` (rooms that were left during the given token range)
- In order for bans/kicks to not show up in sync, you need to `/forget` those
rooms. This doesn't modify the event itself though and only adds the
`forgotten` flag to the `room_memberships` table in Synapse. There isn't a way
to tell when a room was forgotten at the moment so we can't factor it into the
from/to range.
In order for bans/kicks to not show up, you need to `/forget` those rooms. This
doesn't modify the event itself though and only adds the `forgotten` flag to the
`room_memberships` table in Synapse. There isn't a way to tell when a room was
forgotten at the moment so we can't factor it into the token range.
Args:
user: User to fetch rooms for
@ -580,8 +640,8 @@ class SlidingSyncHandler:
from_token: The point in the stream to sync from.
Returns:
A dictionary of room IDs that should be listed in the sync response along
with membership information in that room at the time of `to_token`.
A dictionary of room IDs that the user has had membership in along with
membership information in that room at the time of `to_token`.
"""
user_id = user.to_string()
@ -592,9 +652,6 @@ class SlidingSyncHandler:
# We want to fetch any kind of membership (joined and left rooms) in order
# to get the `event_pos` of the latest room membership event for the
# user.
#
# We will filter out the rooms that don't belong below (see
# `filter_membership_for_sync`)
membership_list=Membership.LIST,
excluded_rooms=self.rooms_to_exclude_globally,
)
@ -614,7 +671,9 @@ class SlidingSyncHandler:
event_pos=room_for_user.event_pos,
membership=room_for_user.membership,
sender=room_for_user.sender,
# We will update these fields below to be accurate
newly_joined=False,
newly_left=False,
is_dm=False,
)
for room_for_user in room_for_user_list
@ -653,12 +712,10 @@ class SlidingSyncHandler:
# - 1a) Remove rooms that the user joined after the `to_token`
# - 1b) Add back rooms that the user left after the `to_token`
# - 1c) Update room membership events to the point in time of the `to_token`
# - 2) Add back newly_left rooms (> `from_token` and <= `to_token`)
# - 3) Figure out which rooms are `newly_joined`
# - 2) Figure out which rooms are `newly_left` rooms (> `from_token` and <= `to_token`)
# - 3) Figure out which rooms are `newly_joined` (> `from_token` and <= `to_token`)
# - 4) Figure out which rooms are DM's
# 1) -----------------------------------------------------
# 1) Fetch membership changes that fall in the range from `to_token` up to
# `membership_snapshot_token`
#
@ -717,7 +774,9 @@ class SlidingSyncHandler:
event_pos=first_membership_change_after_to_token.prev_event_pos,
membership=first_membership_change_after_to_token.prev_membership,
sender=first_membership_change_after_to_token.prev_sender,
# We will update these fields below to be accurate
newly_joined=False,
newly_left=False,
is_dm=False,
)
else:
@ -726,22 +785,6 @@ class SlidingSyncHandler:
# exact membership state and shouldn't rely on the current snapshot.
sync_room_id_set.pop(room_id, None)
# Filter the rooms that that we have updated room membership events to the point
# in time of the `to_token` (from the "1)" fixups)
filtered_sync_room_id_set = {
room_id: room_membership_for_user
for room_id, room_membership_for_user in sync_room_id_set.items()
if filter_membership_for_sync(
membership=room_membership_for_user.membership,
user_id=user_id,
sender=room_membership_for_user.sender,
)
}
# 2) -----------------------------------------------------
# We fix-up newly_left rooms after the first fixup because it may have removed
# some left rooms that we can figure out are newly_left in the following code
# 2) Fetch membership changes that fall in the range from `from_token` up to `to_token`
current_state_delta_membership_changes_in_from_to_range = []
if from_token:
@ -803,19 +846,40 @@ class SlidingSyncHandler:
if last_membership_change_in_from_to_range.membership == Membership.JOIN:
possibly_newly_joined_room_ids.add(room_id)
# 2) Add back newly_left rooms (> `from_token` and <= `to_token`). We
# include newly_left rooms because the last event that the user should see
# is their own leave event
# 2) Figure out newly_left rooms (> `from_token` and <= `to_token`).
if last_membership_change_in_from_to_range.membership == Membership.LEAVE:
filtered_sync_room_id_set[room_id] = _RoomMembershipForUser(
room_id=room_id,
event_id=last_membership_change_in_from_to_range.event_id,
event_pos=last_membership_change_in_from_to_range.event_pos,
membership=last_membership_change_in_from_to_range.membership,
sender=last_membership_change_in_from_to_range.sender,
newly_joined=False,
is_dm=False,
)
# 2) Mark this room as `newly_left`
# If we're seeing a membership change here, we should expect to already
# have it in our snapshot but if a state reset happens, it wouldn't have
# shown up in our snapshot but appear as a change here.
existing_sync_entry = sync_room_id_set.get(room_id)
if existing_sync_entry is not None:
# Normal expected case
sync_room_id_set[room_id] = existing_sync_entry.copy_and_replace(
newly_left=True
)
else:
# State reset!
logger.warn(
"State reset detected for room_id %s with %s who is no longer in the room",
room_id,
user_id,
)
# Even though a state reset happened which removed the person from
# the room, we still add it the list so the user knows they left the
# room. Downstream code can check for a state reset by looking for
# `event_id=None and membership is not None`.
sync_room_id_set[room_id] = _RoomMembershipForUser(
room_id=room_id,
event_id=last_membership_change_in_from_to_range.event_id,
event_pos=last_membership_change_in_from_to_range.event_pos,
membership=last_membership_change_in_from_to_range.membership,
sender=last_membership_change_in_from_to_range.sender,
newly_joined=False,
newly_left=True,
is_dm=False,
)
# 3) Figure out `newly_joined`
for room_id in possibly_newly_joined_room_ids:
@ -826,9 +890,9 @@ class SlidingSyncHandler:
# also some non-join in the range, we know they `newly_joined`.
if has_non_join_in_from_to_range:
# We found a `newly_joined` room (we left and joined within the token range)
filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[
room_id
].copy_and_replace(newly_joined=True)
sync_room_id_set[room_id] = sync_room_id_set[room_id].copy_and_replace(
newly_joined=True
)
else:
prev_event_id = first_membership_change_by_room_id_in_from_to_range[
room_id
@ -840,7 +904,7 @@ class SlidingSyncHandler:
if prev_event_id is None:
# We found a `newly_joined` room (we are joining the room for the
# first time within the token range)
filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[
sync_room_id_set[room_id] = sync_room_id_set[
room_id
].copy_and_replace(newly_joined=True)
# Last resort, we need to step back to the previous membership event
@ -848,7 +912,7 @@ class SlidingSyncHandler:
elif prev_membership != Membership.JOIN:
# We found a `newly_joined` room (we left before the token range
# and joined within the token range)
filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[
sync_room_id_set[room_id] = sync_room_id_set[
room_id
].copy_and_replace(newly_joined=True)
@ -876,12 +940,122 @@ class SlidingSyncHandler:
dm_room_id_set.add(room_id)
# 4) Fixup
for room_id in filtered_sync_room_id_set:
filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[
room_id
].copy_and_replace(is_dm=room_id in dm_room_id_set)
for room_id in sync_room_id_set:
sync_room_id_set[room_id] = sync_room_id_set[room_id].copy_and_replace(
is_dm=room_id in dm_room_id_set
)
return filtered_sync_room_id_set
return sync_room_id_set
async def filter_rooms_relevant_for_sync(
self,
user: UserID,
room_membership_for_user_map: Dict[str, _RoomMembershipForUser],
) -> Dict[str, _RoomMembershipForUser]:
"""
Filter room IDs that should/can be listed for this user in the sync response (the
full room list that will be further filtered, sorted, and sliced).
We're looking for rooms where the user has the following state in the token
range (> `from_token` and <= `to_token`):
- `invite`, `join`, `knock`, `ban` membership events
- Kicks (`leave` membership events where `sender` is different from the
`user_id`/`state_key`)
- `newly_left` (rooms that were left during the given token range)
- In order for bans/kicks to not show up in sync, you need to `/forget` those
rooms. This doesn't modify the event itself though and only adds the
`forgotten` flag to the `room_memberships` table in Synapse. There isn't a way
to tell when a room was forgotten at the moment so we can't factor it into the
from/to range.
Args:
user: User that is syncing
room_membership_for_user_map: Room membership for the user
Returns:
A dictionary of room IDs that should be listed in the sync response along
with membership information in that room at the time of `to_token`.
"""
user_id = user.to_string()
# Filter rooms to only what we're interested to sync with
filtered_sync_room_map = {
room_id: room_membership_for_user
for room_id, room_membership_for_user in room_membership_for_user_map.items()
if filter_membership_for_sync(
user_id=user_id,
room_membership_for_user=room_membership_for_user,
)
}
return filtered_sync_room_map
async def check_room_subscription_allowed_for_user(
self,
room_id: str,
room_membership_for_user_map: Dict[str, _RoomMembershipForUser],
to_token: StreamToken,
) -> Optional[_RoomMembershipForUser]:
"""
Check whether the user is allowed to see the room based on whether they have
ever had membership in the room or if the room is `world_readable`.
Similar to `check_user_in_room_or_world_readable(...)`
Args:
room_id: Room to check
room_membership_for_user_map: Room membership for the user at the time of
the `to_token` (<= `to_token`).
to_token: The token to fetch rooms up to.
Returns:
The room membership for the user if they are allowed to subscribe to the
room else `None`.
"""
# We can first check if they are already allowed to see the room based
# on our previous work to assemble the `room_membership_for_user_map`.
#
# If they have had any membership in the room over time (up to the `to_token`),
# let them subscribe and see what they can.
existing_membership_for_user = room_membership_for_user_map.get(room_id)
if existing_membership_for_user is not None:
return existing_membership_for_user
# TODO: Handle `world_readable` rooms
return None
# If the room is `world_readable`, it doesn't matter whether they can join,
# everyone can see the room.
# not_in_room_membership_for_user = _RoomMembershipForUser(
# room_id=room_id,
# event_id=None,
# event_pos=None,
# membership=None,
# sender=None,
# newly_joined=False,
# newly_left=False,
# is_dm=False,
# )
# room_state = await self.get_current_state_at(
# room_id=room_id,
# room_membership_for_user_at_to_token=not_in_room_membership_for_user,
# state_filter=StateFilter.from_types(
# [(EventTypes.RoomHistoryVisibility, "")]
# ),
# to_token=to_token,
# )
# visibility_event = room_state.get((EventTypes.RoomHistoryVisibility, ""))
# if (
# visibility_event is not None
# and visibility_event.content.get("history_visibility")
# == HistoryVisibility.WORLD_READABLE
# ):
# return not_in_room_membership_for_user
# return None
async def filter_rooms(
self,
@ -1081,7 +1255,6 @@ class SlidingSyncHandler:
in the room at the time of `to_token`.
to_token: The point in the stream to sync up to.
"""
room_state_ids: StateMap[str]
# People shouldn't see past their leave/ban event
if room_membership_for_user_at_to_token.membership in (
@ -1349,10 +1522,10 @@ class SlidingSyncHandler:
stripped_state.append(strip_event(invite_or_knock_event))
# TODO: Handle state resets. For example, if we see
# `room_membership_for_user_at_to_token.membership = Membership.LEAVE` but
# `required_state` doesn't include it, we should indicate to the client that a
# state reset happened. Perhaps we should indicate this by setting `initial:
# True` and empty `required_state`.
# `room_membership_for_user_at_to_token.event_id=None and
# room_membership_for_user_at_to_token.membership is not None`, we should
# indicate to the client that a state reset happened. Perhaps we should indicate
# this by setting `initial: True` and empty `required_state`.
# TODO: Since we can't determine whether we've already sent a room down this
# Sliding Sync connection before (we plan to add this optimization in the

File diff suppressed because it is too large Load Diff

View File

@ -20,7 +20,8 @@
#
import json
import logging
from typing import AbstractSet, Any, Dict, Iterable, List, Optional
from http import HTTPStatus
from typing import Any, Dict, Iterable, List
from parameterized import parameterized, parameterized_class
@ -1259,7 +1260,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
exact: bool = False,
) -> None:
"""
Wrapper around `_assertIncludes` to give slightly better looking diff error
Wrapper around `assertIncludes` to give slightly better looking diff error
messages that include some context "$event_id (type, state_key)".
Args:
@ -1275,7 +1276,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
for event in actual_required_state:
assert isinstance(event, dict)
self._assertIncludes(
self.assertIncludes(
{
f'{event["event_id"]} ("{event["type"]}", "{event["state_key"]}")'
for event in actual_required_state
@ -1289,56 +1290,6 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
message=str(actual_required_state),
)
def _assertIncludes(
self,
actual_items: AbstractSet[str],
expected_items: AbstractSet[str],
exact: bool = False,
message: Optional[str] = None,
) -> None:
"""
Assert that all of the `expected_items` are included in the `actual_items`.
This assert could also be called `assertContains`, `assertItemsInSet`
Args:
actual_items: The container
expected_items: The items to check for in the container
exact: Whether the actual state should be exactly equal to the expected
state (no extras).
message: Optional message to include in the failure message.
"""
# Check that each set has the same items
if exact and actual_items == expected_items:
return
# Check for a superset
elif not exact and actual_items >= expected_items:
return
expected_lines: List[str] = []
for expected_item in expected_items:
is_expected_in_actual = expected_item in actual_items
expected_lines.append(
"{} {}".format(" " if is_expected_in_actual else "?", expected_item)
)
actual_lines: List[str] = []
for actual_item in actual_items:
is_actual_in_expected = actual_item in expected_items
actual_lines.append(
"{} {}".format("+" if is_actual_in_expected else " ", actual_item)
)
newline = "\n"
expected_string = f"Expected items to be in actual ('?' = missing expected items):\n {{\n{newline.join(expected_lines)}\n }}"
actual_string = f"Actual ('+' = found expected items):\n {{\n{newline.join(actual_lines)}\n }}"
first_message = (
"Items must match exactly" if exact else "Some expected items are missing."
)
diff_message = f"{first_message}\n{expected_string}\n{actual_string}"
self.fail(f"{diff_message}\n{message}")
def _add_new_dm_to_global_account_data(
self, source_user_id: str, target_user_id: str, target_room_id: str
) -> None:
@ -3868,6 +3819,13 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
body={"foo": "bar"},
tok=user2_tok,
)
self.helper.send_state(
room_id1,
event_type="org.matrix.bar_state",
state_key="",
body={"bar": "qux"},
tok=user2_tok,
)
# Make the Sliding Sync request with wildcards for the `state_key`
channel = self.make_request(
@ -3891,16 +3849,13 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
],
"timeline_limit": 0,
},
}
# TODO: Room subscription should also combine with the `required_state`
# "room_subscriptions": {
# room_id1: {
# "required_state": [
# ["org.matrix.bar_state", ""]
# ],
# "timeline_limit": 0,
# }
# }
},
"room_subscriptions": {
room_id1: {
"required_state": [["org.matrix.bar_state", ""]],
"timeline_limit": 0,
}
},
},
access_token=user1_tok,
)
@ -3917,6 +3872,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
state_map[(EventTypes.Member, user1_id)],
state_map[(EventTypes.Member, user2_id)],
state_map[("org.matrix.foo_state", "")],
state_map[("org.matrix.bar_state", "")],
},
exact=True,
)
@ -4009,6 +3965,271 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
channel.json_body["lists"]["foo-list"],
)
def test_room_subscriptions_with_join_membership(self) -> None:
"""
Test `room_subscriptions` with a joined room should give us timeline and current
state events.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
join_response = self.helper.join(room_id1, user1_id, tok=user1_tok)
# Make the Sliding Sync request with just the room subscription
channel = self.make_request(
"POST",
self.sync_endpoint,
{
"room_subscriptions": {
room_id1: {
"required_state": [
[EventTypes.Create, ""],
],
"timeline_limit": 1,
}
},
},
access_token=user1_tok,
)
self.assertEqual(channel.code, 200, channel.json_body)
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id1)
)
# We should see some state
self._assertRequiredStateIncludes(
channel.json_body["rooms"][room_id1]["required_state"],
{
state_map[(EventTypes.Create, "")],
},
exact=True,
)
self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state"))
# We should see some events
self.assertEqual(
[
event["event_id"]
for event in channel.json_body["rooms"][room_id1]["timeline"]
],
[
join_response["event_id"],
],
channel.json_body["rooms"][room_id1]["timeline"],
)
# No "live" events in an initial sync (no `from_token` to define the "live"
# range)
self.assertEqual(
channel.json_body["rooms"][room_id1]["num_live"],
0,
channel.json_body["rooms"][room_id1],
)
# There are more events to paginate to
self.assertEqual(
channel.json_body["rooms"][room_id1]["limited"],
True,
channel.json_body["rooms"][room_id1],
)
def test_room_subscriptions_with_leave_membership(self) -> None:
"""
Test `room_subscriptions` with a leave room should give us timeline and state
events up to the leave event.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
self.helper.send_state(
room_id1,
event_type="org.matrix.foo_state",
state_key="",
body={"foo": "bar"},
tok=user2_tok,
)
join_response = self.helper.join(room_id1, user1_id, tok=user1_tok)
leave_response = self.helper.leave(room_id1, user1_id, tok=user1_tok)
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id1)
)
# Send some events after user1 leaves
self.helper.send(room_id1, "activity after leave", tok=user2_tok)
# Update state after user1 leaves
self.helper.send_state(
room_id1,
event_type="org.matrix.foo_state",
state_key="",
body={"foo": "qux"},
tok=user2_tok,
)
# Make the Sliding Sync request with just the room subscription
channel = self.make_request(
"POST",
self.sync_endpoint,
{
"room_subscriptions": {
room_id1: {
"required_state": [
["org.matrix.foo_state", ""],
],
"timeline_limit": 2,
}
},
},
access_token=user1_tok,
)
self.assertEqual(channel.code, 200, channel.json_body)
# We should see the state at the time of the leave
self._assertRequiredStateIncludes(
channel.json_body["rooms"][room_id1]["required_state"],
{
state_map[("org.matrix.foo_state", "")],
},
exact=True,
)
self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state"))
# We should see some before we left (nothing after)
self.assertEqual(
[
event["event_id"]
for event in channel.json_body["rooms"][room_id1]["timeline"]
],
[
join_response["event_id"],
leave_response["event_id"],
],
channel.json_body["rooms"][room_id1]["timeline"],
)
# No "live" events in an initial sync (no `from_token` to define the "live"
# range)
self.assertEqual(
channel.json_body["rooms"][room_id1]["num_live"],
0,
channel.json_body["rooms"][room_id1],
)
# There are more events to paginate to
self.assertEqual(
channel.json_body["rooms"][room_id1]["limited"],
True,
channel.json_body["rooms"][room_id1],
)
def test_room_subscriptions_no_leak_private_room(self) -> None:
"""
Test `room_subscriptions` with a private room we have never been in should not
leak any data to the user.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=False)
# We should not be able to join the private room
self.helper.join(
room_id1, user1_id, tok=user1_tok, expect_code=HTTPStatus.FORBIDDEN
)
# Make the Sliding Sync request with just the room subscription
channel = self.make_request(
"POST",
self.sync_endpoint,
{
"room_subscriptions": {
room_id1: {
"required_state": [
[EventTypes.Create, ""],
],
"timeline_limit": 1,
}
},
},
access_token=user1_tok,
)
self.assertEqual(channel.code, 200, channel.json_body)
# We should not see the room at all (we're not in it)
self.assertIsNone(
channel.json_body["rooms"].get(room_id1), channel.json_body["rooms"]
)
def test_room_subscriptions_world_readable(self) -> None:
"""
Test `room_subscriptions` with a room that has `world_readable` history visibility
FIXME: We should be able to see the room timeline and state
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
# Create a room with `world_readable` history visibility
room_id1 = self.helper.create_room_as(
user2_id,
tok=user2_tok,
extra_content={
"preset": "public_chat",
"initial_state": [
{
"content": {
"history_visibility": HistoryVisibility.WORLD_READABLE
},
"state_key": "",
"type": EventTypes.RoomHistoryVisibility,
}
],
},
)
# Ensure we're testing with a room with `world_readable` history visibility
# which means events are visible to anyone even without membership.
history_visibility_response = self.helper.get_state(
room_id1, EventTypes.RoomHistoryVisibility, tok=user2_tok
)
self.assertEqual(
history_visibility_response.get("history_visibility"),
HistoryVisibility.WORLD_READABLE,
)
# Note: We never join the room
# Make the Sliding Sync request with just the room subscription
channel = self.make_request(
"POST",
self.sync_endpoint,
{
"room_subscriptions": {
room_id1: {
"required_state": [
[EventTypes.Create, ""],
],
"timeline_limit": 1,
}
},
},
access_token=user1_tok,
)
self.assertEqual(channel.code, 200, channel.json_body)
# FIXME: In the future, we should be able to see the room because it's
# `world_readable` but currently we don't support this.
self.assertIsNone(
channel.json_body["rooms"].get(room_id1), channel.json_body["rooms"]
)
class SlidingSyncToDeviceExtensionTestCase(unittest.HomeserverTestCase):
"""Tests for the to-device sliding sync extension"""

View File

@ -28,6 +28,7 @@ import logging
import secrets
import time
from typing import (
AbstractSet,
Any,
Awaitable,
Callable,
@ -269,6 +270,56 @@ class TestCase(unittest.TestCase):
required[key], actual[key], msg="%s mismatch. %s" % (key, actual)
)
def assertIncludes(
self,
actual_items: AbstractSet[str],
expected_items: AbstractSet[str],
exact: bool = False,
message: Optional[str] = None,
) -> None:
"""
Assert that all of the `expected_items` are included in the `actual_items`.
This assert could also be called `assertContains`, `assertItemsInSet`
Args:
actual_items: The container
expected_items: The items to check for in the container
exact: Whether the actual state should be exactly equal to the expected
state (no extras).
message: Optional message to include in the failure message.
"""
# Check that each set has the same items
if exact and actual_items == expected_items:
return
# Check for a superset
elif not exact and actual_items >= expected_items:
return
expected_lines: List[str] = []
for expected_item in expected_items:
is_expected_in_actual = expected_item in actual_items
expected_lines.append(
"{} {}".format(" " if is_expected_in_actual else "?", expected_item)
)
actual_lines: List[str] = []
for actual_item in actual_items:
is_actual_in_expected = actual_item in expected_items
actual_lines.append(
"{} {}".format("+" if is_actual_in_expected else " ", actual_item)
)
newline = "\n"
expected_string = f"Expected items to be in actual ('?' = missing expected items):\n {{\n{newline.join(expected_lines)}\n }}"
actual_string = f"Actual ('+' = found expected items):\n {{\n{newline.join(actual_lines)}\n }}"
first_message = (
"Items must match exactly" if exact else "Some expected items are missing."
)
diff_message = f"{first_message}\n{expected_string}\n{actual_string}"
self.fail(f"{diff_message}\n{message}")
def DEBUG(target: TV) -> TV:
"""A decorator to set the .loglevel attribute to logging.DEBUG.