diff --git a/changelog.d/17447.feature b/changelog.d/17447.feature new file mode 100644 index 000000000..6f80e298a --- /dev/null +++ b/changelog.d/17447.feature @@ -0,0 +1 @@ +Track which rooms have been sent to clients in the experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. diff --git a/pyproject.toml b/pyproject.toml index 0b5dc418e..1adf8e087 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -201,8 +201,8 @@ netaddr = ">=0.7.18" # add a lower bound to the Jinja2 dependency. Jinja2 = ">=3.0" bleach = ">=1.4.3" -# We use `Self`, which were added in `typing-extensions` 4.0. -typing-extensions = ">=4.0" +# We use `assert_never`, which were added in `typing-extensions` 4.1. +typing-extensions = ">=4.1" # We enforce that we have a `cryptography` version that bundles an `openssl` # with the latest security patches. cryptography = ">=3.4.7" diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 323157440..2b74f1c9c 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -18,6 +18,7 @@ # # import logging +from enum import Enum from itertools import chain from typing import ( TYPE_CHECKING, @@ -34,6 +35,7 @@ from typing import ( import attr from immutabledict import immutabledict +from typing_extensions import assert_never from synapse.api.constants import AccountDataTypes, Direction, EventTypes, Membership from synapse.events import EventBase @@ -52,6 +54,7 @@ from synapse.types import ( RoomStreamToken, SlidingSyncStreamToken, StateMap, + StrCollection, StreamKeyType, StreamToken, UserID, @@ -361,6 +364,8 @@ class SlidingSyncHandler: self.push_rules_handler = hs.get_push_rules_handler() self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync + self.connection_store = SlidingSyncConnectionStore() + async def wait_for_sync_for_user( self, requester: Requester, @@ -464,6 +469,11 @@ class SlidingSyncHandler: # See https://github.com/matrix-org/matrix-doc/issues/1144 raise NotImplementedError() + await self.connection_store.mark_token_seen( + sync_config=sync_config, + from_token=from_token, + ) + # Get all of the room IDs that the user should be able to see in the sync # response has_lists = sync_config.lists is not None and len(sync_config.lists) > 0 @@ -613,7 +623,7 @@ class SlidingSyncHandler: @tag_args async def handle_room(room_id: str) -> None: room_sync_result = await self.get_room_sync_data( - user=sync_config.user, + sync_config=sync_config, room_id=room_id, room_sync_config=relevant_room_map[room_id], room_membership_for_user_at_to_token=room_membership_for_user_map[ @@ -635,11 +645,22 @@ class SlidingSyncHandler: to_token=to_token, ) - # TODO: Update this when we implement per-connection state - connection_token = 0 + if has_lists or has_room_subscriptions: + connection_position = await self.connection_store.record_rooms( + sync_config=sync_config, + from_token=from_token, + sent_room_ids=relevant_room_map.keys(), + # TODO: We need to calculate which rooms have had updates since the `from_token` but were not included in the `sent_room_ids` + unsent_room_ids=[], + ) + elif from_token: + connection_position = from_token.connection_position + else: + # Initial sync without a `from_token` starts at `0` + connection_position = 0 return SlidingSyncResult( - next_pos=SlidingSyncStreamToken(to_token, connection_token), + next_pos=SlidingSyncStreamToken(to_token, connection_position), lists=lists, rooms=rooms, extensions=extensions, @@ -1370,7 +1391,7 @@ class SlidingSyncHandler: async def get_room_sync_data( self, - user: UserID, + sync_config: SlidingSyncConfig, room_id: str, room_sync_config: RoomSyncConfig, room_membership_for_user_at_to_token: _RoomMembershipForUser, @@ -1392,6 +1413,37 @@ class SlidingSyncHandler: from_token: The point in the stream to sync from. to_token: The point in the stream to sync up to. """ + user = sync_config.user + + # Determine whether we should limit the timeline to the token range. + # + # We should return historical messages (before token range) in the + # following cases because we want clients to be able to show a basic + # screen of information: + # - Initial sync (because no `from_token` to limit us anyway) + # - When users `newly_joined` + # - For an incremental sync where we haven't sent it down this + # connection before + from_bound = None + initial = True + if from_token and not room_membership_for_user_at_to_token.newly_joined: + room_status = await self.connection_store.have_sent_room( + sync_config=sync_config, + connection_token=from_token.connection_position, + room_id=room_id, + ) + if room_status.status == HaveSentRoomFlag.LIVE: + from_bound = from_token.stream_token.room_key + initial = False + elif room_status.status == HaveSentRoomFlag.PREVIOUSLY: + assert room_status.last_token is not None + from_bound = room_status.last_token + initial = False + elif room_status.status == HaveSentRoomFlag.NEVER: + from_bound = None + initial = True + else: + assert_never(room_status.status) # Assemble the list of timeline events # @@ -1418,36 +1470,23 @@ class SlidingSyncHandler: prev_batch_token = to_token # We're going to paginate backwards from the `to_token` - from_bound = to_token.room_key + to_bound = to_token.room_key # People shouldn't see past their leave/ban event if room_membership_for_user_at_to_token.membership in ( Membership.LEAVE, Membership.BAN, ): - from_bound = ( + to_bound = ( room_membership_for_user_at_to_token.event_pos.to_room_stream_token() ) - # Determine whether we should limit the timeline to the token range. - # - # We should return historical messages (before token range) in the - # following cases because we want clients to be able to show a basic - # screen of information: - # - Initial sync (because no `from_token` to limit us anyway) - # - When users `newly_joined` - # - TODO: For an incremental sync where we haven't sent it down this - # connection before - to_bound = ( - from_token.stream_token.room_key - if from_token is not None - and not room_membership_for_user_at_to_token.newly_joined - else None - ) - timeline_events, new_room_key = await self.store.paginate_room_events( room_id=room_id, - from_key=from_bound, - to_key=to_bound, + # The bounds are reversed so we can paginate backwards + # (from newer to older events) starting at to_bound. + # This ensures we fill the `limit` with the newest events first, + from_key=to_bound, + to_key=from_bound, direction=Direction.BACKWARDS, # We add one so we can determine if there are enough events to saturate # the limit or not (see `limited`) @@ -1564,12 +1603,6 @@ class SlidingSyncHandler: # indicate to the client that a state reset happened. Perhaps we should indicate # this by setting `initial: True` and empty `required_state`. - # TODO: Since we can't determine whether we've already sent a room down this - # Sliding Sync connection before (we plan to add this optimization in the - # future), we're always returning the requested room state instead of - # updates. - initial = True - # Check whether the room has a name set name_state_ids = await self.get_current_state_ids_at( room_id=room_id, @@ -1715,9 +1748,22 @@ class SlidingSyncHandler: to_token=to_token, ) else: - # TODO: Once we can figure out if we've sent a room down this connection before, - # we can return updates instead of the full required state. - raise NotImplementedError() + assert from_bound is not None + + # TODO: Limit the number of state events we're about to send down + # the room, if its too many we should change this to an + # `initial=True`? + deltas = await self.store.get_current_state_deltas_for_room( + room_id=room_id, + from_token=from_bound, + to_token=to_token.room_key, + ) + # TODO: Filter room state before fetching events + # TODO: Handle state resets where event_id is None + events = await self.store.get_events( + [d.event_id for d in deltas if d.event_id] + ) + room_state = {(s.type, s.state_key): s for s in events.values()} required_room_state: StateMap[EventBase] = {} if required_state_filter != StateFilter.none(): @@ -1863,7 +1909,7 @@ class SlidingSyncHandler: to_token: The point in the stream to sync up to. """ user_id = sync_config.user.to_string() - device_id = sync_config.device_id + device_id = sync_config.requester.device_id # Skip if the extension is not enabled if not to_device_request.enabled: @@ -1939,7 +1985,7 @@ class SlidingSyncHandler: from_token: The point in the stream to sync from. """ user_id = sync_config.user.to_string() - device_id = sync_config.device_id + device_id = sync_config.requester.device_id # Skip if the extension is not enabled if not e2ee_request.enabled: @@ -2094,3 +2140,235 @@ class SlidingSyncHandler: global_account_data_map=global_account_data_map, account_data_by_room_map=account_data_by_room_map, ) + + +class HaveSentRoomFlag(Enum): + """Flag for whether we have sent the room down a sliding sync connection. + + The valid state changes here are: + NEVER -> LIVE + LIVE -> PREVIOUSLY + PREVIOUSLY -> LIVE + """ + + # The room has never been sent down (or we have forgotten we have sent it + # down). + NEVER = 1 + + # We have previously sent the room down, but there are updates that we + # haven't sent down. + PREVIOUSLY = 2 + + # We have sent the room down and the client has received all updates. + LIVE = 3 + + +@attr.s(auto_attribs=True, slots=True, frozen=True) +class HaveSentRoom: + """Whether we have sent the room down a sliding sync connection. + + Attributes: + status: Flag of if we have or haven't sent down the room + last_token: If the flag is `PREVIOUSLY` then this is non-null and + contains the last stream token of the last updates we sent down + the room, i.e. we still need to send everything since then to the + client. + """ + + status: HaveSentRoomFlag + last_token: Optional[RoomStreamToken] + + @staticmethod + def previously(last_token: RoomStreamToken) -> "HaveSentRoom": + """Constructor for `PREVIOUSLY` flag.""" + return HaveSentRoom(HaveSentRoomFlag.PREVIOUSLY, last_token) + + +HAVE_SENT_ROOM_NEVER = HaveSentRoom(HaveSentRoomFlag.NEVER, None) +HAVE_SENT_ROOM_LIVE = HaveSentRoom(HaveSentRoomFlag.LIVE, None) + + +@attr.s(auto_attribs=True) +class SlidingSyncConnectionStore: + """In-memory store of per-connection state, including what rooms we have + previously sent down a sliding sync connection. + + Note: This is NOT safe to run in a worker setup because connection positions will + point to different sets of rooms on different workers. e.g. for the same connection, + a connection position of 5 might have totally different states on worker A and + worker B. + + One complication that we need to deal with here is needing to handle requests being + resent, i.e. if we sent down a room in a response that the client received, we must + consider the room *not* sent when we get the request again. + + This is handled by using an integer "token", which is returned to the client + as part of the sync token. For each connection we store a mapping from + tokens to the room states, and create a new entry when we send down new + rooms. + + Note that for any given sliding sync connection we will only store a maximum + of two different tokens: the previous token from the request and a new token + sent in the response. When we receive a request with a given token, we then + clear out all other entries with a different token. + + Attributes: + _connections: Mapping from `(user_id, conn_id)` to mapping of `token` + to mapping of room ID to `HaveSentRoom`. + """ + + # `(user_id, conn_id)` -> `token` -> `room_id` -> `HaveSentRoom` + _connections: Dict[Tuple[str, str], Dict[int, Dict[str, HaveSentRoom]]] = ( + attr.Factory(dict) + ) + + async def have_sent_room( + self, sync_config: SlidingSyncConfig, connection_token: int, room_id: str + ) -> HaveSentRoom: + """For the given user_id/conn_id/token, return whether we have + previously sent the room down + """ + + conn_key = self._get_connection_key(sync_config) + sync_statuses = self._connections.setdefault(conn_key, {}) + room_status = sync_statuses.get(connection_token, {}).get( + room_id, HAVE_SENT_ROOM_NEVER + ) + + return room_status + + async def record_rooms( + self, + sync_config: SlidingSyncConfig, + from_token: Optional[SlidingSyncStreamToken], + *, + sent_room_ids: StrCollection, + unsent_room_ids: StrCollection, + ) -> int: + """Record which rooms we have/haven't sent down in a new response + + Attributes: + sync_config + from_token: The since token from the request, if any + sent_room_ids: The set of room IDs that we have sent down as + part of this request (only needs to be ones we didn't + previously sent down). + unsent_room_ids: The set of room IDs that have had updates + since the `from_token`, but which were not included in + this request + """ + prev_connection_token = 0 + if from_token is not None: + prev_connection_token = from_token.connection_position + + # If there are no changes then this is a noop. + if not sent_room_ids and not unsent_room_ids: + return prev_connection_token + + conn_key = self._get_connection_key(sync_config) + sync_statuses = self._connections.setdefault(conn_key, {}) + + # Generate a new token, removing any existing entries in that token + # (which can happen if requests get resent). + new_store_token = prev_connection_token + 1 + sync_statuses.pop(new_store_token, None) + + # Copy over and update the room mappings. + new_room_statuses = dict(sync_statuses.get(prev_connection_token, {})) + + # Whether we have updated the `new_room_statuses`, if we don't by the + # end we can treat this as a noop. + have_updated = False + for room_id in sent_room_ids: + new_room_statuses[room_id] = HAVE_SENT_ROOM_LIVE + have_updated = True + + # Whether we add/update the entries for unsent rooms depends on the + # existing entry: + # - LIVE: We have previously sent down everything up to + # `last_room_token, so we update the entry to be `PREVIOUSLY` with + # `last_room_token`. + # - PREVIOUSLY: We have previously sent down everything up to *a* + # given token, so we don't need to update the entry. + # - NEVER: We have never previously sent down the room, and we haven't + # sent anything down this time either so we leave it as NEVER. + + # Work out the new state for unsent rooms that were `LIVE`. + if from_token: + new_unsent_state = HaveSentRoom.previously(from_token.stream_token.room_key) + else: + new_unsent_state = HAVE_SENT_ROOM_NEVER + + for room_id in unsent_room_ids: + prev_state = new_room_statuses.get(room_id) + if prev_state is not None and prev_state.status == HaveSentRoomFlag.LIVE: + new_room_statuses[room_id] = new_unsent_state + have_updated = True + + if not have_updated: + return prev_connection_token + + sync_statuses[new_store_token] = new_room_statuses + + return new_store_token + + async def mark_token_seen( + self, + sync_config: SlidingSyncConfig, + from_token: Optional[SlidingSyncStreamToken], + ) -> None: + """We have received a request with the given token, so we can clear out + any other tokens associated with the connection. + + If there is no from token then we have started afresh, and so we delete + all tokens associated with the device. + """ + # Clear out any tokens for the connection that doesn't match the one + # from the request. + + conn_key = self._get_connection_key(sync_config) + sync_statuses = self._connections.pop(conn_key, {}) + if from_token is None: + return + + sync_statuses = { + connection_token: room_statuses + for connection_token, room_statuses in sync_statuses.items() + if connection_token == from_token.connection_position + } + if sync_statuses: + self._connections[conn_key] = sync_statuses + + @staticmethod + def _get_connection_key(sync_config: SlidingSyncConfig) -> Tuple[str, str]: + """Return a unique identifier for this connection. + + The first part is simply the user ID. + + The second part is generally a combination of device ID and conn_id. + However, both these two are optional (e.g. puppet access tokens don't + have device IDs), so this handles those edge cases. + + We use this over the raw `conn_id` to avoid clashes between different + clients that use the same `conn_id`. Imagine a user uses a web client + that uses `conn_id: main_sync_loop` and an Android client that also has + a `conn_id: main_sync_loop`. + """ + + user_id = sync_config.user.to_string() + + # Only one sliding sync connection is allowed per given conn_id (empty + # or not). + conn_id = sync_config.conn_id or "" + + if sync_config.requester.device_id: + return (user_id, f"D/{sync_config.requester.device_id}/{conn_id}") + + if sync_config.requester.access_token_id: + # If we don't have a device, then the access token ID should be a + # stable ID. + return (user_id, f"A/{sync_config.requester.access_token_id}/{conn_id}") + + # If we have neither then its likely an AS or some weird token. Either + # way we can just fail here. + raise Exception("Cannot use sliding sync with access token type") diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py index 7cf1f5643..bf3ac8d48 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py @@ -881,7 +881,6 @@ class SlidingSyncRestServlet(RestServlet): ) user = requester.user - device_id = requester.device_id timeout = parse_integer(request, "timeout", default=0) # Position in the stream @@ -902,11 +901,12 @@ class SlidingSyncRestServlet(RestServlet): sync_config = SlidingSyncConfig( user=user, - device_id=device_id, + requester=requester, # FIXME: Currently, we're just manually copying the fields from the - # `SlidingSyncBody` into the config. How can we gurantee into the future + # `SlidingSyncBody` into the config. How can we guarantee into the future # that we don't forget any? I would like something more structured like # `copy_attributes(from=body, to=config)` + conn_id=body.conn_id, lists=body.lists, room_subscriptions=body.room_subscriptions, extensions=body.extensions, diff --git a/synapse/server.py b/synapse/server.py index 4a3f9ff93..46b9d83a0 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -559,6 +559,7 @@ class HomeServer(metaclass=abc.ABCMeta): def get_sync_handler(self) -> SyncHandler: return SyncHandler(self) + @cache_in_self def get_sliding_sync_handler(self) -> SlidingSyncHandler: return SlidingSyncHandler(self) diff --git a/synapse/storage/databases/main/state_deltas.py b/synapse/storage/databases/main/state_deltas.py index 036972ac2..da3ebe66b 100644 --- a/synapse/storage/databases/main/state_deltas.py +++ b/synapse/storage/databases/main/state_deltas.py @@ -26,6 +26,8 @@ import attr from synapse.storage._base import SQLBaseStore from synapse.storage.database import LoggingTransaction +from synapse.storage.databases.main.stream import _filter_results_by_stream +from synapse.types import RoomStreamToken from synapse.util.caches.stream_change_cache import StreamChangeCache logger = logging.getLogger(__name__) @@ -156,3 +158,38 @@ class StateDeltasStore(SQLBaseStore): "get_max_stream_id_in_current_state_deltas", self._get_max_stream_id_in_current_state_deltas_txn, ) + + async def get_current_state_deltas_for_room( + self, room_id: str, from_token: RoomStreamToken, to_token: RoomStreamToken + ) -> List[StateDelta]: + """Get the state deltas between two tokens.""" + + def get_current_state_deltas_for_room_txn( + txn: LoggingTransaction, + ) -> List[StateDelta]: + sql = """ + SELECT instance_name, stream_id, type, state_key, event_id, prev_event_id + FROM current_state_delta_stream + WHERE room_id = ? AND ? < stream_id AND stream_id <= ? + ORDER BY stream_id ASC + """ + txn.execute( + sql, (room_id, from_token.stream, to_token.get_max_stream_pos()) + ) + + return [ + StateDelta( + stream_id=row[1], + room_id=room_id, + event_type=row[2], + state_key=row[3], + event_id=row[4], + prev_event_id=row[5], + ) + for row in txn + if _filter_results_by_stream(from_token, to_token, row[0], row[1]) + ] + + return await self.db_pool.runInteraction( + "get_current_state_deltas_for_room", get_current_state_deltas_for_room_txn + ) diff --git a/synapse/types/handlers/__init__.py b/synapse/types/handlers/__init__.py index 479222a18..f3141b05a 100644 --- a/synapse/types/handlers/__init__.py +++ b/synapse/types/handlers/__init__.py @@ -35,6 +35,7 @@ from synapse.types import ( DeviceListUpdates, JsonDict, JsonMapping, + Requester, SlidingSyncStreamToken, StreamToken, UserID, @@ -109,7 +110,7 @@ class SlidingSyncConfig(SlidingSyncBody): """ user: UserID - device_id: Optional[str] + requester: Requester # Pydantic config class Config: diff --git a/synapse/types/rest/client/__init__.py b/synapse/types/rest/client/__init__.py index 34e07ddac..dfe3b1e0f 100644 --- a/synapse/types/rest/client/__init__.py +++ b/synapse/types/rest/client/__init__.py @@ -120,6 +120,9 @@ class SlidingSyncBody(RequestBodyModel): Sliding Sync API request body. Attributes: + conn_id: An optional string to identify this connection to the server. + Only one sliding sync connection is allowed per given conn_id (empty + or not). lists: Sliding window API. A map of list key to list information (:class:`SlidingSyncList`). Max lists: 100. The list keys should be arbitrary strings which the client is using to refer to the list. Keep this @@ -343,6 +346,8 @@ class SlidingSyncBody(RequestBodyModel): e2ee: Optional[E2eeExtension] = None account_data: Optional[AccountDataExtension] = None + conn_id: Optional[str] + # mypy workaround via https://github.com/pydantic/pydantic/issues/156#issuecomment-1130883884 if TYPE_CHECKING: lists: Optional[Dict[str, SlidingSyncList]] = None diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index 2bbbd95a7..3e7b8f76a 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -59,6 +59,7 @@ from synapse.types import ( StreamToken, UserID, ) +from synapse.types.handlers import SlidingSyncConfig from synapse.util import Clock from synapse.util.stringutils import random_string @@ -3676,13 +3677,52 @@ class SlidingSyncTestCase(SlidingSyncBase): # Make the incremental Sliding Sync request response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) + # We only return updates but only if we've sent the room down the + # connection before. + self.assertIsNone(response_body["rooms"][room_id1].get("required_state")) + self.assertIsNone(response_body["rooms"][room_id1].get("invite_state")) + + def test_rooms_required_state_incremental_sync_restart(self) -> None: + """ + Test `rooms.required_state` returns requested state events in the room during an + incremental sync, after a restart (and so the in memory caches are reset). + """ + + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [ + [EventTypes.Create, ""], + [EventTypes.RoomHistoryVisibility, ""], + # This one doesn't exist in the room + [EventTypes.Tombstone, ""], + ], + "timeline_limit": 1, + } + } + } + _, from_token = self.do_sync(sync_body, tok=user1_tok) + + # Reset the in-memory cache + self.hs.get_sliding_sync_handler().connection_store._connections.clear() + + # Make the Sliding Sync request + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) + + # If the cache has been cleared then we do expect the state to come down state_map = self.get_success( self.storage_controllers.state.get_current_state(room_id1) ) - # The returned state doesn't change from initial to incremental sync. In the - # future, we will only return updates but only if we've sent the room down the - # connection before. self._assertRequiredStateIncludes( response_body["rooms"][room_id1]["required_state"], { @@ -4434,6 +4474,412 @@ class SlidingSyncTestCase(SlidingSyncBase): # `world_readable` but currently we don't support this. self.assertIsNone(response_body["rooms"].get(room_id1), response_body["rooms"]) + def test_rooms_required_state_incremental_sync_LIVE(self) -> None: + """Test that we only get state updates in incremental sync for rooms + we've already seen (LIVE). + """ + + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + + # Make the Sliding Sync request + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [ + [EventTypes.Create, ""], + [EventTypes.RoomHistoryVisibility, ""], + # This one doesn't exist in the room + [EventTypes.Name, ""], + ], + "timeline_limit": 0, + } + } + } + + response_body, from_token = self.do_sync(sync_body, tok=user1_tok) + + state_map = self.get_success( + self.storage_controllers.state.get_current_state(room_id1) + ) + + self._assertRequiredStateIncludes( + response_body["rooms"][room_id1]["required_state"], + { + state_map[(EventTypes.Create, "")], + state_map[(EventTypes.RoomHistoryVisibility, "")], + }, + exact=True, + ) + + # Send a state event + self.helper.send_state( + room_id1, EventTypes.Name, body={"name": "foo"}, tok=user2_tok + ) + + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) + + state_map = self.get_success( + self.storage_controllers.state.get_current_state(room_id1) + ) + + self.assertNotIn("initial", response_body["rooms"][room_id1]) + self._assertRequiredStateIncludes( + response_body["rooms"][room_id1]["required_state"], + { + state_map[(EventTypes.Name, "")], + }, + exact=True, + ) + + @parameterized.expand([(False,), (True,)]) + def test_rooms_timeline_incremental_sync_PREVIOUSLY(self, limited: bool) -> None: + """ + Test getting room data where we have previously sent down the room, but + we missed sending down some timeline events previously and so its status + is considered PREVIOUSLY. + + There are two versions of this test, one where there are more messages + than the timeline limit, and one where there isn't. + """ + + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok) + room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok) + + self.helper.send(room_id1, "msg", tok=user1_tok) + + timeline_limit = 5 + conn_id = "conn_id" + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 0]], + "required_state": [], + "timeline_limit": timeline_limit, + } + }, + "conn_id": "conn_id", + } + + # The first room gets sent down the initial sync + response_body, initial_from_token = self.do_sync(sync_body, tok=user1_tok) + self.assertCountEqual( + response_body["rooms"].keys(), {room_id1}, response_body["rooms"] + ) + + # We now send down some events in room1 (depending on the test param). + expected_events = [] # The set of events in the timeline + if limited: + for _ in range(10): + resp = self.helper.send(room_id1, "msg1", tok=user1_tok) + expected_events.append(resp["event_id"]) + else: + resp = self.helper.send(room_id1, "msg1", tok=user1_tok) + expected_events.append(resp["event_id"]) + + # A second messages happens in the other room, so room1 won't get sent down. + self.helper.send(room_id2, "msg", tok=user1_tok) + + # Only the second room gets sent down sync. + response_body, from_token = self.do_sync( + sync_body, since=initial_from_token, tok=user1_tok + ) + + self.assertCountEqual( + response_body["rooms"].keys(), {room_id2}, response_body["rooms"] + ) + + # FIXME: This is a hack to record that the first room wasn't sent down + # sync, as we don't implement that currently. + sliding_sync_handler = self.hs.get_sliding_sync_handler() + requester = self.get_success( + self.hs.get_auth().get_user_by_access_token(user1_tok) + ) + sync_config = SlidingSyncConfig( + user=requester.user, + requester=requester, + conn_id=conn_id, + ) + + parsed_initial_from_token = self.get_success( + SlidingSyncStreamToken.from_string(self.store, initial_from_token) + ) + connection_position = self.get_success( + sliding_sync_handler.connection_store.record_rooms( + sync_config, + parsed_initial_from_token, + sent_room_ids=[], + unsent_room_ids=[room_id1], + ) + ) + + # FIXME: Now fix up `from_token` with new connect position above. + parsed_from_token = self.get_success( + SlidingSyncStreamToken.from_string(self.store, from_token) + ) + parsed_from_token = SlidingSyncStreamToken( + stream_token=parsed_from_token.stream_token, + connection_position=connection_position, + ) + from_token = self.get_success(parsed_from_token.to_string(self.store)) + + # We now send another event to room1, so we should sync all the missing events. + resp = self.helper.send(room_id1, "msg2", tok=user1_tok) + expected_events.append(resp["event_id"]) + + # This sync should contain the messages from room1 not yet sent down. + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) + + self.assertCountEqual( + response_body["rooms"].keys(), {room_id1}, response_body["rooms"] + ) + self.assertNotIn("initial", response_body["rooms"][room_id1]) + + self.assertEqual( + [ev["event_id"] for ev in response_body["rooms"][room_id1]["timeline"]], + expected_events[-timeline_limit:], + ) + self.assertEqual(response_body["rooms"][room_id1]["limited"], limited) + self.assertEqual(response_body["rooms"][room_id1].get("required_state"), None) + + def test_rooms_required_state_incremental_sync_PREVIOUSLY(self) -> None: + """ + Test getting room data where we have previously sent down the room, but + we missed sending down some state previously and so its status is + considered PREVIOUSLY. + """ + + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok) + room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok) + + self.helper.send(room_id1, "msg", tok=user1_tok) + + conn_id = "conn_id" + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 0]], + "required_state": [ + [EventTypes.Create, ""], + [EventTypes.RoomHistoryVisibility, ""], + # This one doesn't exist in the room + [EventTypes.Name, ""], + ], + "timeline_limit": 0, + } + }, + "conn_id": "conn_id", + } + + # The first room gets sent down the initial sync + response_body, initial_from_token = self.do_sync(sync_body, tok=user1_tok) + self.assertCountEqual( + response_body["rooms"].keys(), {room_id1}, response_body["rooms"] + ) + + # We now send down some state in room1 + resp = self.helper.send_state( + room_id1, EventTypes.Name, {"name": "foo"}, tok=user1_tok + ) + name_change_id = resp["event_id"] + + # A second messages happens in the other room, so room1 won't get sent down. + self.helper.send(room_id2, "msg", tok=user1_tok) + + # Only the second room gets sent down sync. + response_body, from_token = self.do_sync( + sync_body, since=initial_from_token, tok=user1_tok + ) + + self.assertCountEqual( + response_body["rooms"].keys(), {room_id2}, response_body["rooms"] + ) + + # FIXME: This is a hack to record that the first room wasn't sent down + # sync, as we don't implement that currently. + sliding_sync_handler = self.hs.get_sliding_sync_handler() + requester = self.get_success( + self.hs.get_auth().get_user_by_access_token(user1_tok) + ) + sync_config = SlidingSyncConfig( + user=requester.user, + requester=requester, + conn_id=conn_id, + ) + + parsed_initial_from_token = self.get_success( + SlidingSyncStreamToken.from_string(self.store, initial_from_token) + ) + connection_position = self.get_success( + sliding_sync_handler.connection_store.record_rooms( + sync_config, + parsed_initial_from_token, + sent_room_ids=[], + unsent_room_ids=[room_id1], + ) + ) + + # FIXME: Now fix up `from_token` with new connect position above. + parsed_from_token = self.get_success( + SlidingSyncStreamToken.from_string(self.store, from_token) + ) + parsed_from_token = SlidingSyncStreamToken( + stream_token=parsed_from_token.stream_token, + connection_position=connection_position, + ) + from_token = self.get_success(parsed_from_token.to_string(self.store)) + + # We now send another event to room1, so we should sync all the missing state. + self.helper.send(room_id1, "msg", tok=user1_tok) + + # This sync should contain the state changes from room1. + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) + + self.assertCountEqual( + response_body["rooms"].keys(), {room_id1}, response_body["rooms"] + ) + self.assertNotIn("initial", response_body["rooms"][room_id1]) + + # We should only see the name change. + self.assertEqual( + [ + ev["event_id"] + for ev in response_body["rooms"][room_id1]["required_state"] + ], + [name_change_id], + ) + + def test_rooms_required_state_incremental_sync_NEVER(self) -> None: + """ + Test getting `required_state` where we have NEVER sent down the room before + """ + + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok) + room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok) + + self.helper.send(room_id1, "msg", tok=user1_tok) + + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 0]], + "required_state": [ + [EventTypes.Create, ""], + [EventTypes.RoomHistoryVisibility, ""], + # This one doesn't exist in the room + [EventTypes.Name, ""], + ], + "timeline_limit": 1, + } + }, + } + + # A message happens in the other room, so room1 won't get sent down. + self.helper.send(room_id2, "msg", tok=user1_tok) + + # Only the second room gets sent down sync. + response_body, from_token = self.do_sync(sync_body, tok=user1_tok) + + self.assertCountEqual( + response_body["rooms"].keys(), {room_id2}, response_body["rooms"] + ) + + # We now send another event to room1, so we should send down the full + # room. + self.helper.send(room_id1, "msg2", tok=user1_tok) + + # This sync should contain the messages from room1 not yet sent down. + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) + + self.assertCountEqual( + response_body["rooms"].keys(), {room_id1}, response_body["rooms"] + ) + + self.assertEqual(response_body["rooms"][room_id1]["initial"], True) + + state_map = self.get_success( + self.storage_controllers.state.get_current_state(room_id1) + ) + + self._assertRequiredStateIncludes( + response_body["rooms"][room_id1]["required_state"], + { + state_map[(EventTypes.Create, "")], + state_map[(EventTypes.RoomHistoryVisibility, "")], + }, + exact=True, + ) + + def test_rooms_timeline_incremental_sync_NEVER(self) -> None: + """ + Test getting timeline room data where we have NEVER sent down the room + before + """ + + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok) + room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok) + + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 0]], + "required_state": [], + "timeline_limit": 5, + } + }, + } + + expected_events = [] + for _ in range(4): + resp = self.helper.send(room_id1, "msg", tok=user1_tok) + expected_events.append(resp["event_id"]) + + # A message happens in the other room, so room1 won't get sent down. + self.helper.send(room_id2, "msg", tok=user1_tok) + + # Only the second room gets sent down sync. + response_body, from_token = self.do_sync(sync_body, tok=user1_tok) + + self.assertCountEqual( + response_body["rooms"].keys(), {room_id2}, response_body["rooms"] + ) + + # We now send another event to room1 so it comes down sync + resp = self.helper.send(room_id1, "msg2", tok=user1_tok) + expected_events.append(resp["event_id"]) + + # This sync should contain the messages from room1 not yet sent down. + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) + + self.assertCountEqual( + response_body["rooms"].keys(), {room_id1}, response_body["rooms"] + ) + + self.assertEqual( + [ev["event_id"] for ev in response_body["rooms"][room_id1]["timeline"]], + expected_events, + ) + self.assertEqual(response_body["rooms"][room_id1]["limited"], True) + self.assertEqual(response_body["rooms"][room_id1]["initial"], True) + class SlidingSyncToDeviceExtensionTestCase(SlidingSyncBase): """Tests for the to-device sliding sync extension"""