Add stream_ordering sort to Sliding Sync /sync (#17293)

Sort is no longer configurable and we always sort rooms by the `stream_ordering` of the last event in the room or the point where the user can see up to in cases of leave/ban/invite/knock.
This commit is contained in:
Eric Eastwood 2024-06-17 11:27:14 -05:00 committed by GitHub
parent e88332b5f4
commit e5b8a3e37f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 459 additions and 121 deletions

View File

@ -0,0 +1 @@
Add `stream_ordering` sort to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.

View File

@ -201,7 +201,7 @@ class MessageHandler:
if at_token:
last_event_id = (
await self.store.get_last_event_in_room_before_stream_ordering(
await self.store.get_last_event_id_in_room_before_stream_ordering(
room_id,
end_token=at_token.room_key,
)

View File

@ -18,13 +18,20 @@
#
#
import logging
from typing import TYPE_CHECKING, AbstractSet, Dict, List, Optional
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
from immutabledict import immutabledict
from synapse.api.constants import AccountDataTypes, Membership
from synapse.events import EventBase
from synapse.types import Requester, RoomStreamToken, StreamToken, UserID
from synapse.storage.roommember import RoomsForUser
from synapse.types import (
PersistedEventPosition,
Requester,
RoomStreamToken,
StreamToken,
UserID,
)
from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult
if TYPE_CHECKING:
@ -33,6 +40,27 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
def convert_event_to_rooms_for_user(event: EventBase) -> RoomsForUser:
"""
Quick helper to convert an event to a `RoomsForUser` object.
"""
# These fields should be present for all persisted events
assert event.internal_metadata.stream_ordering is not None
assert event.internal_metadata.instance_name is not None
return RoomsForUser(
room_id=event.room_id,
sender=event.sender,
membership=event.membership,
event_id=event.event_id,
event_pos=PersistedEventPosition(
event.internal_metadata.instance_name,
event.internal_metadata.stream_ordering,
),
room_version_id=event.room_version.identifier,
)
def filter_membership_for_sync(*, membership: str, user_id: str, sender: str) -> bool:
"""
Returns True if the membership event should be included in the sync response,
@ -169,26 +197,28 @@ class SlidingSyncHandler:
# See https://github.com/matrix-org/matrix-doc/issues/1144
raise NotImplementedError()
# Get all of the room IDs that the user should be able to see in the sync
# response
room_id_set = await self.get_sync_room_ids_for_user(
sync_config.user,
from_token=from_token,
to_token=to_token,
)
# Assemble sliding window lists
lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
if sync_config.lists:
# Get all of the room IDs that the user should be able to see in the sync
# response
sync_room_map = await self.get_sync_room_ids_for_user(
sync_config.user,
from_token=from_token,
to_token=to_token,
)
for list_key, list_config in sync_config.lists.items():
# Apply filters
filtered_room_ids = room_id_set
filtered_sync_room_map = sync_room_map
if list_config.filters is not None:
filtered_room_ids = await self.filter_rooms(
sync_config.user, room_id_set, list_config.filters, to_token
filtered_sync_room_map = await self.filter_rooms(
sync_config.user, sync_room_map, list_config.filters, to_token
)
# TODO: Apply sorts
sorted_room_ids = sorted(filtered_room_ids)
sorted_room_info = await self.sort_rooms(
filtered_sync_room_map, to_token
)
ops: List[SlidingSyncResult.SlidingWindowList.Operation] = []
if list_config.ranges:
@ -197,12 +227,17 @@ class SlidingSyncHandler:
SlidingSyncResult.SlidingWindowList.Operation(
op=OperationType.SYNC,
range=range,
room_ids=sorted_room_ids[range[0] : range[1]],
room_ids=[
room_id
for room_id, _ in sorted_room_info[
range[0] : range[1]
]
],
)
)
lists[list_key] = SlidingSyncResult.SlidingWindowList(
count=len(sorted_room_ids),
count=len(sorted_room_info),
ops=ops,
)
@ -219,7 +254,7 @@ class SlidingSyncHandler:
user: UserID,
to_token: StreamToken,
from_token: Optional[StreamToken] = None,
) -> AbstractSet[str]:
) -> Dict[str, RoomsForUser]:
"""
Fetch room IDs that should be listed for this user in the sync response (the
full room list that will be filtered, sorted, and sliced).
@ -237,11 +272,14 @@ class SlidingSyncHandler:
to tell when a room was forgotten at the moment so we can't factor it into the
from/to range.
Args:
user: User to fetch rooms for
to_token: The token to fetch rooms up to.
from_token: The point in the stream to sync from.
Returns:
A dictionary of room IDs that should be listed in the sync response along
with membership information in that room at the time of `to_token`.
"""
user_id = user.to_string()
@ -261,11 +299,11 @@ class SlidingSyncHandler:
# If the user has never joined any rooms before, we can just return an empty list
if not room_for_user_list:
return set()
return {}
# Our working list of rooms that can show up in the sync response
sync_room_id_set = {
room_for_user.room_id
room_for_user.room_id: room_for_user
for room_for_user in room_for_user_list
if filter_membership_for_sync(
membership=room_for_user.membership,
@ -415,7 +453,9 @@ class SlidingSyncHandler:
not was_last_membership_already_included
and should_prev_membership_be_included
):
sync_room_id_set.add(room_id)
sync_room_id_set[room_id] = convert_event_to_rooms_for_user(
last_membership_change_after_to_token
)
# 1b) Remove rooms that the user joined (hasn't left) after the `to_token`
#
# For example, if the last membership event after the `to_token` is a "join"
@ -426,7 +466,7 @@ class SlidingSyncHandler:
was_last_membership_already_included
and not should_prev_membership_be_included
):
sync_room_id_set.discard(room_id)
del sync_room_id_set[room_id]
# 2) -----------------------------------------------------
# We fix-up newly_left rooms after the first fixup because it may have removed
@ -461,25 +501,32 @@ class SlidingSyncHandler:
# include newly_left rooms because the last event that the user should see
# is their own leave event
if last_membership_change_in_from_to_range.membership == Membership.LEAVE:
sync_room_id_set.add(room_id)
sync_room_id_set[room_id] = convert_event_to_rooms_for_user(
last_membership_change_in_from_to_range
)
return sync_room_id_set
async def filter_rooms(
self,
user: UserID,
room_id_set: AbstractSet[str],
sync_room_map: Dict[str, RoomsForUser],
filters: SlidingSyncConfig.SlidingSyncList.Filters,
to_token: StreamToken,
) -> AbstractSet[str]:
) -> Dict[str, RoomsForUser]:
"""
Filter rooms based on the sync request.
Args:
user: User to filter rooms for
room_id_set: Set of room IDs to filter down
sync_room_map: Dictionary of room IDs to sort along with membership
information in the room at the time of `to_token`.
filters: Filters to apply
to_token: We filter based on the state of the room at this token
Returns:
A filtered dictionary of room IDs along with membership information in the
room at the time of `to_token`.
"""
user_id = user.to_string()
@ -488,7 +535,7 @@ class SlidingSyncHandler:
# TODO: Exclude partially stated rooms unless the `required_state` has
# `["m.room.member", "$LAZY"]`
filtered_room_id_set = set(room_id_set)
filtered_room_id_set = set(sync_room_map.keys())
# Filter for Direct-Message (DM) rooms
if filters.is_dm is not None:
@ -544,4 +591,57 @@ class SlidingSyncHandler:
if filters.not_tags:
raise NotImplementedError()
return filtered_room_id_set
# Assemble a new sync room map but only with the `filtered_room_id_set`
return {room_id: sync_room_map[room_id] for room_id in filtered_room_id_set}
async def sort_rooms(
self,
sync_room_map: Dict[str, RoomsForUser],
to_token: StreamToken,
) -> List[Tuple[str, RoomsForUser]]:
"""
Sort by `stream_ordering` of the last event that the user should see in the
room. `stream_ordering` is unique so we get a stable sort.
Args:
sync_room_map: Dictionary of room IDs to sort along with membership
information in the room at the time of `to_token`.
to_token: We sort based on the events in the room at this token (<= `to_token`)
Returns:
A sorted list of room IDs by `stream_ordering` along with membership information.
"""
# Assemble a map of room ID to the `stream_ordering` of the last activity that the
# user should see in the room (<= `to_token`)
last_activity_in_room_map: Dict[str, int] = {}
for room_id, room_for_user in sync_room_map.items():
# If they are fully-joined to the room, let's find the latest activity
# at/before the `to_token`.
if room_for_user.membership == Membership.JOIN:
last_event_result = (
await self.store.get_last_event_pos_in_room_before_stream_ordering(
room_id, to_token.room_key
)
)
# If the room has no events at/before the `to_token`, this is probably a
# mistake in the code that generates the `sync_room_map` since that should
# only give us rooms that the user had membership in during the token range.
assert last_event_result is not None
_, event_pos = last_event_result
last_activity_in_room_map[room_id] = event_pos.stream
else:
# Otherwise, if the user has left/been invited/knocked/been banned from
# a room, they shouldn't see anything past that point.
last_activity_in_room_map[room_id] = room_for_user.event_pos.stream
return sorted(
sync_room_map.items(),
# Sort by the last activity (stream_ordering) in the room
key=lambda room_info: last_activity_in_room_map[room_info[0]],
# We want descending order
reverse=True,
)

View File

@ -1036,9 +1036,11 @@ class SyncHandler:
# FIXME: This gets the state at the latest event before the stream ordering,
# which might not be the same as the "current state" of the room at the time
# of the stream token if there were multiple forward extremities at the time.
last_event_id = await self.store.get_last_event_in_room_before_stream_ordering(
room_id,
end_token=stream_position.room_key,
last_event_id = (
await self.store.get_last_event_id_in_room_before_stream_ordering(
room_id,
end_token=stream_position.room_key,
)
)
if last_event_id:
@ -1519,7 +1521,7 @@ class SyncHandler:
# We need to make sure the first event in our batch points to the
# last event in the previous batch.
last_event_id_prev_batch = (
await self.store.get_last_event_in_room_before_stream_ordering(
await self.store.get_last_event_id_in_room_before_stream_ordering(
room_id,
end_token=since_token.room_key,
)

View File

@ -895,7 +895,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
"get_room_event_before_stream_ordering", _f
)
async def get_last_event_in_room_before_stream_ordering(
async def get_last_event_id_in_room_before_stream_ordering(
self,
room_id: str,
end_token: RoomStreamToken,
@ -910,10 +910,38 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
The ID of the most recent event, or None if there are no events in the room
before this stream ordering.
"""
last_event_result = (
await self.get_last_event_pos_in_room_before_stream_ordering(
room_id, end_token
)
)
def get_last_event_in_room_before_stream_ordering_txn(
if last_event_result:
return last_event_result[0]
return None
async def get_last_event_pos_in_room_before_stream_ordering(
self,
room_id: str,
end_token: RoomStreamToken,
) -> Optional[Tuple[str, PersistedEventPosition]]:
"""
Returns the ID and event position of the last event in a room at or before a
stream ordering.
Args:
room_id
end_token: The token used to stream from
Returns:
The ID of the most recent event and it's position, or None if there are no
events in the room before this stream ordering.
"""
def get_last_event_pos_in_room_before_stream_ordering_txn(
txn: LoggingTransaction,
) -> Optional[str]:
) -> Optional[Tuple[str, PersistedEventPosition]]:
# We're looking for the closest event at or before the token. We need to
# handle the fact that the stream token can be a vector clock (with an
# `instance_map`) and events can be persisted on different instances
@ -975,13 +1003,15 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
topological_ordering=topological_ordering,
stream_ordering=stream_ordering,
):
return event_id
return event_id, PersistedEventPosition(
instance_name, stream_ordering
)
return None
return await self.db_pool.runInteraction(
"get_last_event_in_room_before_stream_ordering",
get_last_event_in_room_before_stream_ordering_txn,
"get_last_event_pos_in_room_before_stream_ordering",
get_last_event_pos_in_room_before_stream_ordering_txn,
)
async def get_current_room_stream_token_for_room_id(

View File

@ -175,22 +175,8 @@ class SlidingSyncBody(RequestBodyModel):
ranges: Sliding window ranges. If this field is missing, no sliding window
is used and all rooms are returned in this list. Integers are
*inclusive*.
sort: How the list should be sorted on the server. The first value is
applied first, then tiebreaks are performed with each subsequent sort
listed.
FIXME: Furthermore, it's not currently defined how servers should behave
if they encounter a filter or sort operation they do not recognise. If
the server rejects the request with an HTTP 400 then that will break
backwards compatibility with new clients vs old servers. However, the
client would be otherwise unaware that only some of the sort/filter
operations have taken effect. We may need to include a "warnings"
section to indicate which sort/filter operations are unrecognised,
allowing for some form of graceful degradation of service.
-- https://github.com/matrix-org/matrix-spec-proposals/blob/kegan/sync-v3/proposals/3575-sync.md#filter-and-sort-extensions
slow_get_all_rooms: Just get all rooms (for clients that don't want to deal with
sliding windows). When true, the `ranges` and `sort` fields are ignored.
sliding windows). When true, the `ranges` field is ignored.
required_state: Required state for each room returned. An array of event
type and state key tuples. Elements in this array are ORd together to
produce the final set of state events to return.
@ -229,12 +215,6 @@ class SlidingSyncBody(RequestBodyModel):
`user_id` and optionally `avatar_url` and `displayname`) for the users used
to calculate the room name.
filters: Filters to apply to the list before sorting.
bump_event_types: Allowlist of event types which should be considered recent activity
when sorting `by_recency`. By omitting event types from this field,
clients can ensure that uninteresting events (e.g. a profile rename) do
not cause a room to jump to the top of its list(s). Empty or omitted
`bump_event_types` have no effectall events in a room will be
considered recent activity.
"""
class Filters(RequestBodyModel):
@ -300,11 +280,9 @@ class SlidingSyncBody(RequestBodyModel):
ranges: Optional[List[Tuple[int, int]]] = None
else:
ranges: Optional[List[Tuple[conint(ge=0, strict=True), conint(ge=0, strict=True)]]] = None # type: ignore[valid-type]
sort: Optional[List[StrictStr]] = None
slow_get_all_rooms: Optional[StrictBool] = False
include_heroes: Optional[StrictBool] = False
filters: Optional[Filters] = None
bump_event_types: Optional[List[StrictStr]] = None
class RoomSubscription(CommonRoomParameters):
pass

View File

@ -20,6 +20,8 @@
import logging
from unittest.mock import patch
from parameterized import parameterized
from twisted.test.proto_helpers import MemoryReactor
from synapse.api.constants import AccountDataTypes, EventTypes, JoinRules, Membership
@ -79,7 +81,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
)
)
self.assertEqual(room_id_results, set())
self.assertEqual(room_id_results.keys(), set())
def test_get_newly_joined_room(self) -> None:
"""
@ -103,7 +105,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
)
)
self.assertEqual(room_id_results, {room_id})
self.assertEqual(room_id_results.keys(), {room_id})
def test_get_already_joined_room(self) -> None:
"""
@ -124,7 +126,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
)
)
self.assertEqual(room_id_results, {room_id})
self.assertEqual(room_id_results.keys(), {room_id})
def test_get_invited_banned_knocked_room(self) -> None:
"""
@ -180,7 +182,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
# Ensure that the invited, ban, and knock rooms show up
self.assertEqual(
room_id_results,
room_id_results.keys(),
{
invited_room_id,
ban_room_id,
@ -226,7 +228,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
)
# The kicked room should show up
self.assertEqual(room_id_results, {kick_room_id})
self.assertEqual(room_id_results.keys(), {kick_room_id})
def test_forgotten_rooms(self) -> None:
"""
@ -308,7 +310,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
)
# We shouldn't see the room because it was forgotten
self.assertEqual(room_id_results, set())
self.assertEqual(room_id_results.keys(), set())
def test_only_newly_left_rooms_show_up(self) -> None:
"""
@ -340,7 +342,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
)
# Only the newly_left room should show up
self.assertEqual(room_id_results, {room_id2})
self.assertEqual(room_id_results.keys(), {room_id2})
def test_no_joins_after_to_token(self) -> None:
"""
@ -368,7 +370,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
)
)
self.assertEqual(room_id_results, {room_id1})
self.assertEqual(room_id_results.keys(), {room_id1})
def test_join_during_range_and_left_room_after_to_token(self) -> None:
"""
@ -398,7 +400,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
# We should still see the room because we were joined during the
# from_token/to_token time period.
self.assertEqual(room_id_results, {room_id1})
self.assertEqual(room_id_results.keys(), {room_id1})
def test_join_before_range_and_left_room_after_to_token(self) -> None:
"""
@ -425,7 +427,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
)
# We should still see the room because we were joined before the `from_token`
self.assertEqual(room_id_results, {room_id1})
self.assertEqual(room_id_results.keys(), {room_id1})
def test_kicked_before_range_and_left_after_to_token(self) -> None:
"""
@ -473,7 +475,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
)
# We shouldn't see the room because it was forgotten
self.assertEqual(room_id_results, {kick_room_id})
self.assertEqual(room_id_results.keys(), {kick_room_id})
def test_newly_left_during_range_and_join_leave_after_to_token(self) -> None:
"""
@ -510,7 +512,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
)
# Room should still show up because it's newly_left during the from/to range
self.assertEqual(room_id_results, {room_id1})
self.assertEqual(room_id_results.keys(), {room_id1})
def test_newly_left_during_range_and_join_after_to_token(self) -> None:
"""
@ -546,7 +548,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
)
# Room should still show up because it's newly_left during the from/to range
self.assertEqual(room_id_results, {room_id1})
self.assertEqual(room_id_results.keys(), {room_id1})
def test_no_from_token(self) -> None:
"""
@ -587,7 +589,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
)
# Only rooms we were joined to before the `to_token` should show up
self.assertEqual(room_id_results, {room_id1})
self.assertEqual(room_id_results.keys(), {room_id1})
def test_from_token_ahead_of_to_token(self) -> None:
"""
@ -648,7 +650,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
#
# There won't be any newly_left rooms because the `from_token` is ahead of the
# `to_token` and that range will give no membership changes to check.
self.assertEqual(room_id_results, {room_id1})
self.assertEqual(room_id_results.keys(), {room_id1})
def test_leave_before_range_and_join_leave_after_to_token(self) -> None:
"""
@ -683,7 +685,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
)
# Room shouldn't show up because it was left before the `from_token`
self.assertEqual(room_id_results, set())
self.assertEqual(room_id_results.keys(), set())
def test_leave_before_range_and_join_after_to_token(self) -> None:
"""
@ -717,7 +719,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
)
# Room shouldn't show up because it was left before the `from_token`
self.assertEqual(room_id_results, set())
self.assertEqual(room_id_results.keys(), set())
def test_join_leave_multiple_times_during_range_and_after_to_token(
self,
@ -759,7 +761,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
)
# Room should show up because it was newly_left and joined during the from/to range
self.assertEqual(room_id_results, {room_id1})
self.assertEqual(room_id_results.keys(), {room_id1})
def test_join_leave_multiple_times_before_range_and_after_to_token(
self,
@ -799,7 +801,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
)
# Room should show up because we were joined before the from/to range
self.assertEqual(room_id_results, {room_id1})
self.assertEqual(room_id_results.keys(), {room_id1})
def test_invite_before_range_and_join_leave_after_to_token(
self,
@ -836,7 +838,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
)
# Room should show up because we were invited before the from/to range
self.assertEqual(room_id_results, {room_id1})
self.assertEqual(room_id_results.keys(), {room_id1})
def test_multiple_rooms_are_not_confused(
self,
@ -889,7 +891,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
)
self.assertEqual(
room_id_results,
room_id_results.keys(),
{
# `room_id1` shouldn't show up because we left before the from/to range
#
@ -1048,7 +1050,6 @@ class GetSyncRoomIdsForUserEventShardTestCase(BaseMultiWorkerStreamTestCase):
# Get a token while things are stuck after our activity
stuck_activity_token = self.event_sources.get_current_token()
logger.info("stuck_activity_token %s", stuck_activity_token)
# Let's make sure we're working with a token that has an `instance_map`
self.assertNotEqual(len(stuck_activity_token.room_key.instance_map), 0)
@ -1058,7 +1059,6 @@ class GetSyncRoomIdsForUserEventShardTestCase(BaseMultiWorkerStreamTestCase):
join_on_worker2_pos = self.get_success(
self.store.get_position_for_event(join_on_worker2_response["event_id"])
)
logger.info("join_on_worker2_pos %s", join_on_worker2_pos)
# Ensure the join technially came after our token
self.assertGreater(
join_on_worker2_pos.stream,
@ -1077,7 +1077,6 @@ class GetSyncRoomIdsForUserEventShardTestCase(BaseMultiWorkerStreamTestCase):
join_on_worker3_pos = self.get_success(
self.store.get_position_for_event(join_on_worker3_response["event_id"])
)
logger.info("join_on_worker3_pos %s", join_on_worker3_pos)
# Ensure the join came after the min but still encapsulated by the token
self.assertGreaterEqual(
join_on_worker3_pos.stream,
@ -1103,7 +1102,7 @@ class GetSyncRoomIdsForUserEventShardTestCase(BaseMultiWorkerStreamTestCase):
)
self.assertEqual(
room_id_results,
room_id_results.keys(),
{
room_id1,
# room_id2 shouldn't show up because we left before the from/to range
@ -1217,11 +1216,20 @@ class FilterRoomsTestCase(HomeserverTestCase):
after_rooms_token = self.event_sources.get_current_token()
# Get the rooms the user should be syncing with
sync_room_map = self.get_success(
self.sliding_sync_handler.get_sync_room_ids_for_user(
UserID.from_string(user1_id),
from_token=None,
to_token=after_rooms_token,
)
)
# Try with `is_dm=True`
truthy_filtered_room_ids = self.get_success(
truthy_filtered_room_map = self.get_success(
self.sliding_sync_handler.filter_rooms(
UserID.from_string(user1_id),
{room_id, dm_room_id},
sync_room_map,
SlidingSyncConfig.SlidingSyncList.Filters(
is_dm=True,
),
@ -1229,13 +1237,13 @@ class FilterRoomsTestCase(HomeserverTestCase):
)
)
self.assertEqual(truthy_filtered_room_ids, {dm_room_id})
self.assertEqual(truthy_filtered_room_map.keys(), {dm_room_id})
# Try with `is_dm=False`
falsy_filtered_room_ids = self.get_success(
falsy_filtered_room_map = self.get_success(
self.sliding_sync_handler.filter_rooms(
UserID.from_string(user1_id),
{room_id, dm_room_id},
sync_room_map,
SlidingSyncConfig.SlidingSyncList.Filters(
is_dm=False,
),
@ -1243,4 +1251,160 @@ class FilterRoomsTestCase(HomeserverTestCase):
)
)
self.assertEqual(falsy_filtered_room_ids, {room_id})
self.assertEqual(falsy_filtered_room_map.keys(), {room_id})
class SortRoomsTestCase(HomeserverTestCase):
"""
Tests Sliding Sync handler `sort_rooms()` to make sure it sorts/orders rooms
correctly.
"""
servlets = [
admin.register_servlets,
knock.register_servlets,
login.register_servlets,
room.register_servlets,
]
def default_config(self) -> JsonDict:
config = super().default_config()
# Enable sliding sync
config["experimental_features"] = {"msc3575_enabled": True}
return config
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.sliding_sync_handler = self.hs.get_sliding_sync_handler()
self.store = self.hs.get_datastores().main
self.event_sources = hs.get_event_sources()
def test_sort_activity_basic(self) -> None:
"""
Rooms with newer activity are sorted first.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
room_id1 = self.helper.create_room_as(
user1_id,
tok=user1_tok,
)
room_id2 = self.helper.create_room_as(
user1_id,
tok=user1_tok,
)
after_rooms_token = self.event_sources.get_current_token()
# Get the rooms the user should be syncing with
sync_room_map = self.get_success(
self.sliding_sync_handler.get_sync_room_ids_for_user(
UserID.from_string(user1_id),
from_token=None,
to_token=after_rooms_token,
)
)
# Sort the rooms (what we're testing)
sorted_room_info = self.get_success(
self.sliding_sync_handler.sort_rooms(
sync_room_map=sync_room_map,
to_token=after_rooms_token,
)
)
self.assertEqual(
[room_id for room_id, _ in sorted_room_info],
[room_id2, room_id1],
)
@parameterized.expand(
[
(Membership.LEAVE,),
(Membership.INVITE,),
(Membership.KNOCK,),
(Membership.BAN,),
]
)
def test_activity_after_xxx(self, room1_membership: str) -> None:
"""
When someone has left/been invited/knocked/been banned from a room, they
shouldn't take anything into account after that membership event.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
before_rooms_token = self.event_sources.get_current_token()
# Create the rooms as user2 so we can have user1 with a clean slate to work from
# and join in whatever order we need for the tests.
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
# If we're testing knocks, set the room to knock
if room1_membership == Membership.KNOCK:
self.helper.send_state(
room_id1,
EventTypes.JoinRules,
{"join_rule": JoinRules.KNOCK},
tok=user2_tok,
)
room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
room_id3 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
# Here is the activity with user1 that will determine the sort of the rooms
# (room2, room1, room3)
self.helper.join(room_id3, user1_id, tok=user1_tok)
if room1_membership == Membership.LEAVE:
self.helper.join(room_id1, user1_id, tok=user1_tok)
self.helper.leave(room_id1, user1_id, tok=user1_tok)
elif room1_membership == Membership.INVITE:
self.helper.invite(room_id1, src=user2_id, targ=user1_id, tok=user2_tok)
elif room1_membership == Membership.KNOCK:
self.helper.knock(room_id1, user1_id, tok=user1_tok)
elif room1_membership == Membership.BAN:
self.helper.ban(room_id1, src=user2_id, targ=user1_id, tok=user2_tok)
self.helper.join(room_id2, user1_id, tok=user1_tok)
# Activity before the token but the user is only been xxx to this room so it
# shouldn't be taken into account
self.helper.send(room_id1, "activity in room1", tok=user2_tok)
after_rooms_token = self.event_sources.get_current_token()
# Activity after the token. Just make it in a different order than what we
# expect to make sure we're not taking the activity after the token into
# account.
self.helper.send(room_id1, "activity in room1", tok=user2_tok)
self.helper.send(room_id2, "activity in room2", tok=user2_tok)
self.helper.send(room_id3, "activity in room3", tok=user2_tok)
# Get the rooms the user should be syncing with
sync_room_map = self.get_success(
self.sliding_sync_handler.get_sync_room_ids_for_user(
UserID.from_string(user1_id),
from_token=before_rooms_token,
to_token=after_rooms_token,
)
)
# Sort the rooms (what we're testing)
sorted_room_info = self.get_success(
self.sliding_sync_handler.sort_rooms(
sync_room_map=sync_room_map,
to_token=after_rooms_token,
)
)
self.assertEqual(
[room_id for room_id, _ in sorted_room_info],
[room_id2, room_id1, room_id3],
"Corresponding map to disambiguate the opaque room IDs: "
+ str(
{
"room_id1": room_id1,
"room_id2": room_id2,
"room_id3": room_id3,
}
),
)

View File

@ -1299,7 +1299,6 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
"lists": {
"foo-list": {
"ranges": [[0, 99]],
"sort": ["by_notification_level", "by_recency", "by_name"],
"required_state": [
["m.room.join_rules", ""],
["m.room.history_visibility", ""],
@ -1361,7 +1360,6 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
"lists": {
"foo-list": {
"ranges": [[0, 99]],
"sort": ["by_notification_level", "by_recency", "by_name"],
"required_state": [
["m.room.join_rules", ""],
["m.room.history_visibility", ""],
@ -1415,14 +1413,12 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
"lists": {
"dms": {
"ranges": [[0, 99]],
"sort": ["by_recency"],
"required_state": [],
"timeline_limit": 1,
"filters": {"is_dm": True},
},
"foo-list": {
"ranges": [[0, 99]],
"sort": ["by_recency"],
"required_state": [],
"timeline_limit": 1,
"filters": {"is_dm": False},
@ -1463,3 +1459,60 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
],
list(channel.json_body["lists"]["foo-list"]),
)
def test_sort_list(self) -> None:
"""
Test that the lists are sorted by `stream_ordering`
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
room_id3 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
# Activity that will order the rooms
self.helper.send(room_id3, "activity in room3", tok=user1_tok)
self.helper.send(room_id1, "activity in room1", tok=user1_tok)
self.helper.send(room_id2, "activity in room2", tok=user1_tok)
# Make the Sliding Sync request
channel = self.make_request(
"POST",
self.sync_endpoint,
{
"lists": {
"foo-list": {
"ranges": [[0, 99]],
"required_state": [
["m.room.join_rules", ""],
["m.room.history_visibility", ""],
["m.space.child", "*"],
],
"timeline_limit": 1,
}
}
},
access_token=user1_tok,
)
self.assertEqual(channel.code, 200, channel.json_body)
# Make sure it has the foo-list we requested
self.assertListEqual(
list(channel.json_body["lists"].keys()),
["foo-list"],
channel.json_body["lists"].keys(),
)
# Make sure the list is sorted in the way we expect
self.assertListEqual(
list(channel.json_body["lists"]["foo-list"]["ops"]),
[
{
"op": "SYNC",
"range": [0, 99],
"room_ids": [room_id2, room_id1, room_id3],
}
],
channel.json_body["lists"]["foo-list"],
)

View File

@ -277,7 +277,7 @@ class PaginationTestCase(HomeserverTestCase):
class GetLastEventInRoomBeforeStreamOrderingTestCase(HomeserverTestCase):
"""
Test `get_last_event_in_room_before_stream_ordering(...)`
Test `get_last_event_pos_in_room_before_stream_ordering(...)`
"""
servlets = [
@ -336,14 +336,14 @@ class GetLastEventInRoomBeforeStreamOrderingTestCase(HomeserverTestCase):
room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
last_event = self.get_success(
self.store.get_last_event_in_room_before_stream_ordering(
last_event_result = self.get_success(
self.store.get_last_event_pos_in_room_before_stream_ordering(
room_id=room_id,
end_token=before_room_token.room_key,
)
)
self.assertIsNone(last_event)
self.assertIsNone(last_event_result)
def test_after_room_created(self) -> None:
"""
@ -356,14 +356,16 @@ class GetLastEventInRoomBeforeStreamOrderingTestCase(HomeserverTestCase):
after_room_token = self.event_sources.get_current_token()
last_event = self.get_success(
self.store.get_last_event_in_room_before_stream_ordering(
last_event_result = self.get_success(
self.store.get_last_event_pos_in_room_before_stream_ordering(
room_id=room_id,
end_token=after_room_token.room_key,
)
)
assert last_event_result is not None
last_event_id, _ = last_event_result
self.assertIsNotNone(last_event)
self.assertIsNotNone(last_event_id)
def test_activity_in_other_rooms(self) -> None:
"""
@ -380,16 +382,18 @@ class GetLastEventInRoomBeforeStreamOrderingTestCase(HomeserverTestCase):
after_room_token = self.event_sources.get_current_token()
last_event = self.get_success(
self.store.get_last_event_in_room_before_stream_ordering(
last_event_result = self.get_success(
self.store.get_last_event_pos_in_room_before_stream_ordering(
room_id=room_id1,
end_token=after_room_token.room_key,
)
)
assert last_event_result is not None
last_event_id, _ = last_event_result
# Make sure it's the event we expect (which also means we know it's from the
# correct room)
self.assertEqual(last_event, event_response["event_id"])
self.assertEqual(last_event_id, event_response["event_id"])
def test_activity_after_token_has_no_effect(self) -> None:
"""
@ -408,15 +412,17 @@ class GetLastEventInRoomBeforeStreamOrderingTestCase(HomeserverTestCase):
self.helper.send(room_id1, "after1", tok=user1_tok)
self.helper.send(room_id1, "after2", tok=user1_tok)
last_event = self.get_success(
self.store.get_last_event_in_room_before_stream_ordering(
last_event_result = self.get_success(
self.store.get_last_event_pos_in_room_before_stream_ordering(
room_id=room_id1,
end_token=after_room_token.room_key,
)
)
assert last_event_result is not None
last_event_id, _ = last_event_result
# Make sure it's the last event before the token
self.assertEqual(last_event, event_response["event_id"])
self.assertEqual(last_event_id, event_response["event_id"])
def test_last_event_within_sharded_token(self) -> None:
"""
@ -457,18 +463,20 @@ class GetLastEventInRoomBeforeStreamOrderingTestCase(HomeserverTestCase):
self.helper.send(room_id1, "after1", tok=user1_tok)
self.helper.send(room_id1, "after2", tok=user1_tok)
last_event = self.get_success(
self.store.get_last_event_in_room_before_stream_ordering(
last_event_result = self.get_success(
self.store.get_last_event_pos_in_room_before_stream_ordering(
room_id=room_id1,
end_token=end_token,
)
)
assert last_event_result is not None
last_event_id, _ = last_event_result
# Should find closest event at/before the token in room1
# Should find closest event before the token in room1
self.assertEqual(
last_event,
last_event_id,
event_response3["event_id"],
f"We expected {event_response3['event_id']} but saw {last_event} which corresponds to "
f"We expected {event_response3['event_id']} but saw {last_event_id} which corresponds to "
+ str(
{
"event1": event_response1["event_id"],
@ -514,18 +522,20 @@ class GetLastEventInRoomBeforeStreamOrderingTestCase(HomeserverTestCase):
self.helper.send(room_id1, "after1", tok=user1_tok)
self.helper.send(room_id1, "after2", tok=user1_tok)
last_event = self.get_success(
self.store.get_last_event_in_room_before_stream_ordering(
last_event_result = self.get_success(
self.store.get_last_event_pos_in_room_before_stream_ordering(
room_id=room_id1,
end_token=end_token,
)
)
assert last_event_result is not None
last_event_id, _ = last_event_result
# Should find closest event at/before the token in room1
# Should find closest event before the token in room1
self.assertEqual(
last_event,
last_event_id,
event_response2["event_id"],
f"We expected {event_response2['event_id']} but saw {last_event} which corresponds to "
f"We expected {event_response2['event_id']} but saw {last_event_id} which corresponds to "
+ str(
{
"event1": event_response1["event_id"],