Sliding Sync: Track whether we have sent rooms down to clients (#17447)

The basic idea is that we introduce a new token for a sliding sync
connection, which stores the mapping of room to room "status" (i.e. have
we sent the room down?). This token allows us to handle duplicate
requests properly. In future it can be used to store more
"per-connection" information safely.

In future this should be migrated into the DB, so its important that we
try to reduce the number of syncs where we need to update the
per-connection information. In this PoC this only happens when we: a)
send down a set of room for the first time, or b) we have previously
sent down a room and there are updates but we are not sending the room
down the sync (due to not falling in a list range)

Co-authored-by: Eric Eastwood <eric.eastwood@beta.gouv.fr>
This commit is contained in:
Erik Johnston 2024-07-29 22:45:48 +01:00 committed by GitHub
parent 568051c0f0
commit be4a16ff44
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 814 additions and 45 deletions

View File

@ -0,0 +1 @@
Track which rooms have been sent to clients in the experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.

View File

@ -201,8 +201,8 @@ netaddr = ">=0.7.18"
# add a lower bound to the Jinja2 dependency. # add a lower bound to the Jinja2 dependency.
Jinja2 = ">=3.0" Jinja2 = ">=3.0"
bleach = ">=1.4.3" bleach = ">=1.4.3"
# We use `Self`, which were added in `typing-extensions` 4.0. # We use `assert_never`, which were added in `typing-extensions` 4.1.
typing-extensions = ">=4.0" typing-extensions = ">=4.1"
# We enforce that we have a `cryptography` version that bundles an `openssl` # We enforce that we have a `cryptography` version that bundles an `openssl`
# with the latest security patches. # with the latest security patches.
cryptography = ">=3.4.7" cryptography = ">=3.4.7"

View File

@ -18,6 +18,7 @@
# #
# #
import logging import logging
from enum import Enum
from itertools import chain from itertools import chain
from typing import ( from typing import (
TYPE_CHECKING, TYPE_CHECKING,
@ -34,6 +35,7 @@ from typing import (
import attr import attr
from immutabledict import immutabledict from immutabledict import immutabledict
from typing_extensions import assert_never
from synapse.api.constants import AccountDataTypes, Direction, EventTypes, Membership from synapse.api.constants import AccountDataTypes, Direction, EventTypes, Membership
from synapse.events import EventBase from synapse.events import EventBase
@ -52,6 +54,7 @@ from synapse.types import (
RoomStreamToken, RoomStreamToken,
SlidingSyncStreamToken, SlidingSyncStreamToken,
StateMap, StateMap,
StrCollection,
StreamKeyType, StreamKeyType,
StreamToken, StreamToken,
UserID, UserID,
@ -361,6 +364,8 @@ class SlidingSyncHandler:
self.push_rules_handler = hs.get_push_rules_handler() self.push_rules_handler = hs.get_push_rules_handler()
self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync
self.connection_store = SlidingSyncConnectionStore()
async def wait_for_sync_for_user( async def wait_for_sync_for_user(
self, self,
requester: Requester, requester: Requester,
@ -464,6 +469,11 @@ class SlidingSyncHandler:
# See https://github.com/matrix-org/matrix-doc/issues/1144 # See https://github.com/matrix-org/matrix-doc/issues/1144
raise NotImplementedError() raise NotImplementedError()
await self.connection_store.mark_token_seen(
sync_config=sync_config,
from_token=from_token,
)
# Get all of the room IDs that the user should be able to see in the sync # Get all of the room IDs that the user should be able to see in the sync
# response # response
has_lists = sync_config.lists is not None and len(sync_config.lists) > 0 has_lists = sync_config.lists is not None and len(sync_config.lists) > 0
@ -613,7 +623,7 @@ class SlidingSyncHandler:
@tag_args @tag_args
async def handle_room(room_id: str) -> None: async def handle_room(room_id: str) -> None:
room_sync_result = await self.get_room_sync_data( room_sync_result = await self.get_room_sync_data(
user=sync_config.user, sync_config=sync_config,
room_id=room_id, room_id=room_id,
room_sync_config=relevant_room_map[room_id], room_sync_config=relevant_room_map[room_id],
room_membership_for_user_at_to_token=room_membership_for_user_map[ room_membership_for_user_at_to_token=room_membership_for_user_map[
@ -635,11 +645,22 @@ class SlidingSyncHandler:
to_token=to_token, to_token=to_token,
) )
# TODO: Update this when we implement per-connection state if has_lists or has_room_subscriptions:
connection_token = 0 connection_position = await self.connection_store.record_rooms(
sync_config=sync_config,
from_token=from_token,
sent_room_ids=relevant_room_map.keys(),
# TODO: We need to calculate which rooms have had updates since the `from_token` but were not included in the `sent_room_ids`
unsent_room_ids=[],
)
elif from_token:
connection_position = from_token.connection_position
else:
# Initial sync without a `from_token` starts at `0`
connection_position = 0
return SlidingSyncResult( return SlidingSyncResult(
next_pos=SlidingSyncStreamToken(to_token, connection_token), next_pos=SlidingSyncStreamToken(to_token, connection_position),
lists=lists, lists=lists,
rooms=rooms, rooms=rooms,
extensions=extensions, extensions=extensions,
@ -1370,7 +1391,7 @@ class SlidingSyncHandler:
async def get_room_sync_data( async def get_room_sync_data(
self, self,
user: UserID, sync_config: SlidingSyncConfig,
room_id: str, room_id: str,
room_sync_config: RoomSyncConfig, room_sync_config: RoomSyncConfig,
room_membership_for_user_at_to_token: _RoomMembershipForUser, room_membership_for_user_at_to_token: _RoomMembershipForUser,
@ -1392,6 +1413,37 @@ class SlidingSyncHandler:
from_token: The point in the stream to sync from. from_token: The point in the stream to sync from.
to_token: The point in the stream to sync up to. to_token: The point in the stream to sync up to.
""" """
user = sync_config.user
# Determine whether we should limit the timeline to the token range.
#
# We should return historical messages (before token range) in the
# following cases because we want clients to be able to show a basic
# screen of information:
# - Initial sync (because no `from_token` to limit us anyway)
# - When users `newly_joined`
# - For an incremental sync where we haven't sent it down this
# connection before
from_bound = None
initial = True
if from_token and not room_membership_for_user_at_to_token.newly_joined:
room_status = await self.connection_store.have_sent_room(
sync_config=sync_config,
connection_token=from_token.connection_position,
room_id=room_id,
)
if room_status.status == HaveSentRoomFlag.LIVE:
from_bound = from_token.stream_token.room_key
initial = False
elif room_status.status == HaveSentRoomFlag.PREVIOUSLY:
assert room_status.last_token is not None
from_bound = room_status.last_token
initial = False
elif room_status.status == HaveSentRoomFlag.NEVER:
from_bound = None
initial = True
else:
assert_never(room_status.status)
# Assemble the list of timeline events # Assemble the list of timeline events
# #
@ -1418,36 +1470,23 @@ class SlidingSyncHandler:
prev_batch_token = to_token prev_batch_token = to_token
# We're going to paginate backwards from the `to_token` # We're going to paginate backwards from the `to_token`
from_bound = to_token.room_key to_bound = to_token.room_key
# People shouldn't see past their leave/ban event # People shouldn't see past their leave/ban event
if room_membership_for_user_at_to_token.membership in ( if room_membership_for_user_at_to_token.membership in (
Membership.LEAVE, Membership.LEAVE,
Membership.BAN, Membership.BAN,
): ):
from_bound = ( to_bound = (
room_membership_for_user_at_to_token.event_pos.to_room_stream_token() room_membership_for_user_at_to_token.event_pos.to_room_stream_token()
) )
# Determine whether we should limit the timeline to the token range.
#
# We should return historical messages (before token range) in the
# following cases because we want clients to be able to show a basic
# screen of information:
# - Initial sync (because no `from_token` to limit us anyway)
# - When users `newly_joined`
# - TODO: For an incremental sync where we haven't sent it down this
# connection before
to_bound = (
from_token.stream_token.room_key
if from_token is not None
and not room_membership_for_user_at_to_token.newly_joined
else None
)
timeline_events, new_room_key = await self.store.paginate_room_events( timeline_events, new_room_key = await self.store.paginate_room_events(
room_id=room_id, room_id=room_id,
from_key=from_bound, # The bounds are reversed so we can paginate backwards
to_key=to_bound, # (from newer to older events) starting at to_bound.
# This ensures we fill the `limit` with the newest events first,
from_key=to_bound,
to_key=from_bound,
direction=Direction.BACKWARDS, direction=Direction.BACKWARDS,
# We add one so we can determine if there are enough events to saturate # We add one so we can determine if there are enough events to saturate
# the limit or not (see `limited`) # the limit or not (see `limited`)
@ -1564,12 +1603,6 @@ class SlidingSyncHandler:
# indicate to the client that a state reset happened. Perhaps we should indicate # indicate to the client that a state reset happened. Perhaps we should indicate
# this by setting `initial: True` and empty `required_state`. # this by setting `initial: True` and empty `required_state`.
# TODO: Since we can't determine whether we've already sent a room down this
# Sliding Sync connection before (we plan to add this optimization in the
# future), we're always returning the requested room state instead of
# updates.
initial = True
# Check whether the room has a name set # Check whether the room has a name set
name_state_ids = await self.get_current_state_ids_at( name_state_ids = await self.get_current_state_ids_at(
room_id=room_id, room_id=room_id,
@ -1715,9 +1748,22 @@ class SlidingSyncHandler:
to_token=to_token, to_token=to_token,
) )
else: else:
# TODO: Once we can figure out if we've sent a room down this connection before, assert from_bound is not None
# we can return updates instead of the full required state.
raise NotImplementedError() # TODO: Limit the number of state events we're about to send down
# the room, if its too many we should change this to an
# `initial=True`?
deltas = await self.store.get_current_state_deltas_for_room(
room_id=room_id,
from_token=from_bound,
to_token=to_token.room_key,
)
# TODO: Filter room state before fetching events
# TODO: Handle state resets where event_id is None
events = await self.store.get_events(
[d.event_id for d in deltas if d.event_id]
)
room_state = {(s.type, s.state_key): s for s in events.values()}
required_room_state: StateMap[EventBase] = {} required_room_state: StateMap[EventBase] = {}
if required_state_filter != StateFilter.none(): if required_state_filter != StateFilter.none():
@ -1863,7 +1909,7 @@ class SlidingSyncHandler:
to_token: The point in the stream to sync up to. to_token: The point in the stream to sync up to.
""" """
user_id = sync_config.user.to_string() user_id = sync_config.user.to_string()
device_id = sync_config.device_id device_id = sync_config.requester.device_id
# Skip if the extension is not enabled # Skip if the extension is not enabled
if not to_device_request.enabled: if not to_device_request.enabled:
@ -1939,7 +1985,7 @@ class SlidingSyncHandler:
from_token: The point in the stream to sync from. from_token: The point in the stream to sync from.
""" """
user_id = sync_config.user.to_string() user_id = sync_config.user.to_string()
device_id = sync_config.device_id device_id = sync_config.requester.device_id
# Skip if the extension is not enabled # Skip if the extension is not enabled
if not e2ee_request.enabled: if not e2ee_request.enabled:
@ -2094,3 +2140,235 @@ class SlidingSyncHandler:
global_account_data_map=global_account_data_map, global_account_data_map=global_account_data_map,
account_data_by_room_map=account_data_by_room_map, account_data_by_room_map=account_data_by_room_map,
) )
class HaveSentRoomFlag(Enum):
"""Flag for whether we have sent the room down a sliding sync connection.
The valid state changes here are:
NEVER -> LIVE
LIVE -> PREVIOUSLY
PREVIOUSLY -> LIVE
"""
# The room has never been sent down (or we have forgotten we have sent it
# down).
NEVER = 1
# We have previously sent the room down, but there are updates that we
# haven't sent down.
PREVIOUSLY = 2
# We have sent the room down and the client has received all updates.
LIVE = 3
@attr.s(auto_attribs=True, slots=True, frozen=True)
class HaveSentRoom:
"""Whether we have sent the room down a sliding sync connection.
Attributes:
status: Flag of if we have or haven't sent down the room
last_token: If the flag is `PREVIOUSLY` then this is non-null and
contains the last stream token of the last updates we sent down
the room, i.e. we still need to send everything since then to the
client.
"""
status: HaveSentRoomFlag
last_token: Optional[RoomStreamToken]
@staticmethod
def previously(last_token: RoomStreamToken) -> "HaveSentRoom":
"""Constructor for `PREVIOUSLY` flag."""
return HaveSentRoom(HaveSentRoomFlag.PREVIOUSLY, last_token)
HAVE_SENT_ROOM_NEVER = HaveSentRoom(HaveSentRoomFlag.NEVER, None)
HAVE_SENT_ROOM_LIVE = HaveSentRoom(HaveSentRoomFlag.LIVE, None)
@attr.s(auto_attribs=True)
class SlidingSyncConnectionStore:
"""In-memory store of per-connection state, including what rooms we have
previously sent down a sliding sync connection.
Note: This is NOT safe to run in a worker setup because connection positions will
point to different sets of rooms on different workers. e.g. for the same connection,
a connection position of 5 might have totally different states on worker A and
worker B.
One complication that we need to deal with here is needing to handle requests being
resent, i.e. if we sent down a room in a response that the client received, we must
consider the room *not* sent when we get the request again.
This is handled by using an integer "token", which is returned to the client
as part of the sync token. For each connection we store a mapping from
tokens to the room states, and create a new entry when we send down new
rooms.
Note that for any given sliding sync connection we will only store a maximum
of two different tokens: the previous token from the request and a new token
sent in the response. When we receive a request with a given token, we then
clear out all other entries with a different token.
Attributes:
_connections: Mapping from `(user_id, conn_id)` to mapping of `token`
to mapping of room ID to `HaveSentRoom`.
"""
# `(user_id, conn_id)` -> `token` -> `room_id` -> `HaveSentRoom`
_connections: Dict[Tuple[str, str], Dict[int, Dict[str, HaveSentRoom]]] = (
attr.Factory(dict)
)
async def have_sent_room(
self, sync_config: SlidingSyncConfig, connection_token: int, room_id: str
) -> HaveSentRoom:
"""For the given user_id/conn_id/token, return whether we have
previously sent the room down
"""
conn_key = self._get_connection_key(sync_config)
sync_statuses = self._connections.setdefault(conn_key, {})
room_status = sync_statuses.get(connection_token, {}).get(
room_id, HAVE_SENT_ROOM_NEVER
)
return room_status
async def record_rooms(
self,
sync_config: SlidingSyncConfig,
from_token: Optional[SlidingSyncStreamToken],
*,
sent_room_ids: StrCollection,
unsent_room_ids: StrCollection,
) -> int:
"""Record which rooms we have/haven't sent down in a new response
Attributes:
sync_config
from_token: The since token from the request, if any
sent_room_ids: The set of room IDs that we have sent down as
part of this request (only needs to be ones we didn't
previously sent down).
unsent_room_ids: The set of room IDs that have had updates
since the `from_token`, but which were not included in
this request
"""
prev_connection_token = 0
if from_token is not None:
prev_connection_token = from_token.connection_position
# If there are no changes then this is a noop.
if not sent_room_ids and not unsent_room_ids:
return prev_connection_token
conn_key = self._get_connection_key(sync_config)
sync_statuses = self._connections.setdefault(conn_key, {})
# Generate a new token, removing any existing entries in that token
# (which can happen if requests get resent).
new_store_token = prev_connection_token + 1
sync_statuses.pop(new_store_token, None)
# Copy over and update the room mappings.
new_room_statuses = dict(sync_statuses.get(prev_connection_token, {}))
# Whether we have updated the `new_room_statuses`, if we don't by the
# end we can treat this as a noop.
have_updated = False
for room_id in sent_room_ids:
new_room_statuses[room_id] = HAVE_SENT_ROOM_LIVE
have_updated = True
# Whether we add/update the entries for unsent rooms depends on the
# existing entry:
# - LIVE: We have previously sent down everything up to
# `last_room_token, so we update the entry to be `PREVIOUSLY` with
# `last_room_token`.
# - PREVIOUSLY: We have previously sent down everything up to *a*
# given token, so we don't need to update the entry.
# - NEVER: We have never previously sent down the room, and we haven't
# sent anything down this time either so we leave it as NEVER.
# Work out the new state for unsent rooms that were `LIVE`.
if from_token:
new_unsent_state = HaveSentRoom.previously(from_token.stream_token.room_key)
else:
new_unsent_state = HAVE_SENT_ROOM_NEVER
for room_id in unsent_room_ids:
prev_state = new_room_statuses.get(room_id)
if prev_state is not None and prev_state.status == HaveSentRoomFlag.LIVE:
new_room_statuses[room_id] = new_unsent_state
have_updated = True
if not have_updated:
return prev_connection_token
sync_statuses[new_store_token] = new_room_statuses
return new_store_token
async def mark_token_seen(
self,
sync_config: SlidingSyncConfig,
from_token: Optional[SlidingSyncStreamToken],
) -> None:
"""We have received a request with the given token, so we can clear out
any other tokens associated with the connection.
If there is no from token then we have started afresh, and so we delete
all tokens associated with the device.
"""
# Clear out any tokens for the connection that doesn't match the one
# from the request.
conn_key = self._get_connection_key(sync_config)
sync_statuses = self._connections.pop(conn_key, {})
if from_token is None:
return
sync_statuses = {
connection_token: room_statuses
for connection_token, room_statuses in sync_statuses.items()
if connection_token == from_token.connection_position
}
if sync_statuses:
self._connections[conn_key] = sync_statuses
@staticmethod
def _get_connection_key(sync_config: SlidingSyncConfig) -> Tuple[str, str]:
"""Return a unique identifier for this connection.
The first part is simply the user ID.
The second part is generally a combination of device ID and conn_id.
However, both these two are optional (e.g. puppet access tokens don't
have device IDs), so this handles those edge cases.
We use this over the raw `conn_id` to avoid clashes between different
clients that use the same `conn_id`. Imagine a user uses a web client
that uses `conn_id: main_sync_loop` and an Android client that also has
a `conn_id: main_sync_loop`.
"""
user_id = sync_config.user.to_string()
# Only one sliding sync connection is allowed per given conn_id (empty
# or not).
conn_id = sync_config.conn_id or ""
if sync_config.requester.device_id:
return (user_id, f"D/{sync_config.requester.device_id}/{conn_id}")
if sync_config.requester.access_token_id:
# If we don't have a device, then the access token ID should be a
# stable ID.
return (user_id, f"A/{sync_config.requester.access_token_id}/{conn_id}")
# If we have neither then its likely an AS or some weird token. Either
# way we can just fail here.
raise Exception("Cannot use sliding sync with access token type")

View File

@ -881,7 +881,6 @@ class SlidingSyncRestServlet(RestServlet):
) )
user = requester.user user = requester.user
device_id = requester.device_id
timeout = parse_integer(request, "timeout", default=0) timeout = parse_integer(request, "timeout", default=0)
# Position in the stream # Position in the stream
@ -902,11 +901,12 @@ class SlidingSyncRestServlet(RestServlet):
sync_config = SlidingSyncConfig( sync_config = SlidingSyncConfig(
user=user, user=user,
device_id=device_id, requester=requester,
# FIXME: Currently, we're just manually copying the fields from the # FIXME: Currently, we're just manually copying the fields from the
# `SlidingSyncBody` into the config. How can we gurantee into the future # `SlidingSyncBody` into the config. How can we guarantee into the future
# that we don't forget any? I would like something more structured like # that we don't forget any? I would like something more structured like
# `copy_attributes(from=body, to=config)` # `copy_attributes(from=body, to=config)`
conn_id=body.conn_id,
lists=body.lists, lists=body.lists,
room_subscriptions=body.room_subscriptions, room_subscriptions=body.room_subscriptions,
extensions=body.extensions, extensions=body.extensions,

View File

@ -559,6 +559,7 @@ class HomeServer(metaclass=abc.ABCMeta):
def get_sync_handler(self) -> SyncHandler: def get_sync_handler(self) -> SyncHandler:
return SyncHandler(self) return SyncHandler(self)
@cache_in_self
def get_sliding_sync_handler(self) -> SlidingSyncHandler: def get_sliding_sync_handler(self) -> SlidingSyncHandler:
return SlidingSyncHandler(self) return SlidingSyncHandler(self)

View File

@ -26,6 +26,8 @@ import attr
from synapse.storage._base import SQLBaseStore from synapse.storage._base import SQLBaseStore
from synapse.storage.database import LoggingTransaction from synapse.storage.database import LoggingTransaction
from synapse.storage.databases.main.stream import _filter_results_by_stream
from synapse.types import RoomStreamToken
from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.caches.stream_change_cache import StreamChangeCache
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -156,3 +158,38 @@ class StateDeltasStore(SQLBaseStore):
"get_max_stream_id_in_current_state_deltas", "get_max_stream_id_in_current_state_deltas",
self._get_max_stream_id_in_current_state_deltas_txn, self._get_max_stream_id_in_current_state_deltas_txn,
) )
async def get_current_state_deltas_for_room(
self, room_id: str, from_token: RoomStreamToken, to_token: RoomStreamToken
) -> List[StateDelta]:
"""Get the state deltas between two tokens."""
def get_current_state_deltas_for_room_txn(
txn: LoggingTransaction,
) -> List[StateDelta]:
sql = """
SELECT instance_name, stream_id, type, state_key, event_id, prev_event_id
FROM current_state_delta_stream
WHERE room_id = ? AND ? < stream_id AND stream_id <= ?
ORDER BY stream_id ASC
"""
txn.execute(
sql, (room_id, from_token.stream, to_token.get_max_stream_pos())
)
return [
StateDelta(
stream_id=row[1],
room_id=room_id,
event_type=row[2],
state_key=row[3],
event_id=row[4],
prev_event_id=row[5],
)
for row in txn
if _filter_results_by_stream(from_token, to_token, row[0], row[1])
]
return await self.db_pool.runInteraction(
"get_current_state_deltas_for_room", get_current_state_deltas_for_room_txn
)

View File

@ -35,6 +35,7 @@ from synapse.types import (
DeviceListUpdates, DeviceListUpdates,
JsonDict, JsonDict,
JsonMapping, JsonMapping,
Requester,
SlidingSyncStreamToken, SlidingSyncStreamToken,
StreamToken, StreamToken,
UserID, UserID,
@ -109,7 +110,7 @@ class SlidingSyncConfig(SlidingSyncBody):
""" """
user: UserID user: UserID
device_id: Optional[str] requester: Requester
# Pydantic config # Pydantic config
class Config: class Config:

View File

@ -120,6 +120,9 @@ class SlidingSyncBody(RequestBodyModel):
Sliding Sync API request body. Sliding Sync API request body.
Attributes: Attributes:
conn_id: An optional string to identify this connection to the server.
Only one sliding sync connection is allowed per given conn_id (empty
or not).
lists: Sliding window API. A map of list key to list information lists: Sliding window API. A map of list key to list information
(:class:`SlidingSyncList`). Max lists: 100. The list keys should be (:class:`SlidingSyncList`). Max lists: 100. The list keys should be
arbitrary strings which the client is using to refer to the list. Keep this arbitrary strings which the client is using to refer to the list. Keep this
@ -343,6 +346,8 @@ class SlidingSyncBody(RequestBodyModel):
e2ee: Optional[E2eeExtension] = None e2ee: Optional[E2eeExtension] = None
account_data: Optional[AccountDataExtension] = None account_data: Optional[AccountDataExtension] = None
conn_id: Optional[str]
# mypy workaround via https://github.com/pydantic/pydantic/issues/156#issuecomment-1130883884 # mypy workaround via https://github.com/pydantic/pydantic/issues/156#issuecomment-1130883884
if TYPE_CHECKING: if TYPE_CHECKING:
lists: Optional[Dict[str, SlidingSyncList]] = None lists: Optional[Dict[str, SlidingSyncList]] = None

View File

@ -59,6 +59,7 @@ from synapse.types import (
StreamToken, StreamToken,
UserID, UserID,
) )
from synapse.types.handlers import SlidingSyncConfig
from synapse.util import Clock from synapse.util import Clock
from synapse.util.stringutils import random_string from synapse.util.stringutils import random_string
@ -3676,13 +3677,52 @@ class SlidingSyncTestCase(SlidingSyncBase):
# Make the incremental Sliding Sync request # Make the incremental Sliding Sync request
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
# We only return updates but only if we've sent the room down the
# connection before.
self.assertIsNone(response_body["rooms"][room_id1].get("required_state"))
self.assertIsNone(response_body["rooms"][room_id1].get("invite_state"))
def test_rooms_required_state_incremental_sync_restart(self) -> None:
"""
Test `rooms.required_state` returns requested state events in the room during an
incremental sync, after a restart (and so the in memory caches are reset).
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
self.helper.join(room_id1, user1_id, tok=user1_tok)
sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [
[EventTypes.Create, ""],
[EventTypes.RoomHistoryVisibility, ""],
# This one doesn't exist in the room
[EventTypes.Tombstone, ""],
],
"timeline_limit": 1,
}
}
}
_, from_token = self.do_sync(sync_body, tok=user1_tok)
# Reset the in-memory cache
self.hs.get_sliding_sync_handler().connection_store._connections.clear()
# Make the Sliding Sync request
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
# If the cache has been cleared then we do expect the state to come down
state_map = self.get_success( state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id1) self.storage_controllers.state.get_current_state(room_id1)
) )
# The returned state doesn't change from initial to incremental sync. In the
# future, we will only return updates but only if we've sent the room down the
# connection before.
self._assertRequiredStateIncludes( self._assertRequiredStateIncludes(
response_body["rooms"][room_id1]["required_state"], response_body["rooms"][room_id1]["required_state"],
{ {
@ -4434,6 +4474,412 @@ class SlidingSyncTestCase(SlidingSyncBase):
# `world_readable` but currently we don't support this. # `world_readable` but currently we don't support this.
self.assertIsNone(response_body["rooms"].get(room_id1), response_body["rooms"]) self.assertIsNone(response_body["rooms"].get(room_id1), response_body["rooms"])
def test_rooms_required_state_incremental_sync_LIVE(self) -> None:
"""Test that we only get state updates in incremental sync for rooms
we've already seen (LIVE).
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
self.helper.join(room_id1, user1_id, tok=user1_tok)
# Make the Sliding Sync request
sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [
[EventTypes.Create, ""],
[EventTypes.RoomHistoryVisibility, ""],
# This one doesn't exist in the room
[EventTypes.Name, ""],
],
"timeline_limit": 0,
}
}
}
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id1)
)
self._assertRequiredStateIncludes(
response_body["rooms"][room_id1]["required_state"],
{
state_map[(EventTypes.Create, "")],
state_map[(EventTypes.RoomHistoryVisibility, "")],
},
exact=True,
)
# Send a state event
self.helper.send_state(
room_id1, EventTypes.Name, body={"name": "foo"}, tok=user2_tok
)
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id1)
)
self.assertNotIn("initial", response_body["rooms"][room_id1])
self._assertRequiredStateIncludes(
response_body["rooms"][room_id1]["required_state"],
{
state_map[(EventTypes.Name, "")],
},
exact=True,
)
@parameterized.expand([(False,), (True,)])
def test_rooms_timeline_incremental_sync_PREVIOUSLY(self, limited: bool) -> None:
"""
Test getting room data where we have previously sent down the room, but
we missed sending down some timeline events previously and so its status
is considered PREVIOUSLY.
There are two versions of this test, one where there are more messages
than the timeline limit, and one where there isn't.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
self.helper.send(room_id1, "msg", tok=user1_tok)
timeline_limit = 5
conn_id = "conn_id"
sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 0]],
"required_state": [],
"timeline_limit": timeline_limit,
}
},
"conn_id": "conn_id",
}
# The first room gets sent down the initial sync
response_body, initial_from_token = self.do_sync(sync_body, tok=user1_tok)
self.assertCountEqual(
response_body["rooms"].keys(), {room_id1}, response_body["rooms"]
)
# We now send down some events in room1 (depending on the test param).
expected_events = [] # The set of events in the timeline
if limited:
for _ in range(10):
resp = self.helper.send(room_id1, "msg1", tok=user1_tok)
expected_events.append(resp["event_id"])
else:
resp = self.helper.send(room_id1, "msg1", tok=user1_tok)
expected_events.append(resp["event_id"])
# A second messages happens in the other room, so room1 won't get sent down.
self.helper.send(room_id2, "msg", tok=user1_tok)
# Only the second room gets sent down sync.
response_body, from_token = self.do_sync(
sync_body, since=initial_from_token, tok=user1_tok
)
self.assertCountEqual(
response_body["rooms"].keys(), {room_id2}, response_body["rooms"]
)
# FIXME: This is a hack to record that the first room wasn't sent down
# sync, as we don't implement that currently.
sliding_sync_handler = self.hs.get_sliding_sync_handler()
requester = self.get_success(
self.hs.get_auth().get_user_by_access_token(user1_tok)
)
sync_config = SlidingSyncConfig(
user=requester.user,
requester=requester,
conn_id=conn_id,
)
parsed_initial_from_token = self.get_success(
SlidingSyncStreamToken.from_string(self.store, initial_from_token)
)
connection_position = self.get_success(
sliding_sync_handler.connection_store.record_rooms(
sync_config,
parsed_initial_from_token,
sent_room_ids=[],
unsent_room_ids=[room_id1],
)
)
# FIXME: Now fix up `from_token` with new connect position above.
parsed_from_token = self.get_success(
SlidingSyncStreamToken.from_string(self.store, from_token)
)
parsed_from_token = SlidingSyncStreamToken(
stream_token=parsed_from_token.stream_token,
connection_position=connection_position,
)
from_token = self.get_success(parsed_from_token.to_string(self.store))
# We now send another event to room1, so we should sync all the missing events.
resp = self.helper.send(room_id1, "msg2", tok=user1_tok)
expected_events.append(resp["event_id"])
# This sync should contain the messages from room1 not yet sent down.
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
self.assertCountEqual(
response_body["rooms"].keys(), {room_id1}, response_body["rooms"]
)
self.assertNotIn("initial", response_body["rooms"][room_id1])
self.assertEqual(
[ev["event_id"] for ev in response_body["rooms"][room_id1]["timeline"]],
expected_events[-timeline_limit:],
)
self.assertEqual(response_body["rooms"][room_id1]["limited"], limited)
self.assertEqual(response_body["rooms"][room_id1].get("required_state"), None)
def test_rooms_required_state_incremental_sync_PREVIOUSLY(self) -> None:
"""
Test getting room data where we have previously sent down the room, but
we missed sending down some state previously and so its status is
considered PREVIOUSLY.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
self.helper.send(room_id1, "msg", tok=user1_tok)
conn_id = "conn_id"
sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 0]],
"required_state": [
[EventTypes.Create, ""],
[EventTypes.RoomHistoryVisibility, ""],
# This one doesn't exist in the room
[EventTypes.Name, ""],
],
"timeline_limit": 0,
}
},
"conn_id": "conn_id",
}
# The first room gets sent down the initial sync
response_body, initial_from_token = self.do_sync(sync_body, tok=user1_tok)
self.assertCountEqual(
response_body["rooms"].keys(), {room_id1}, response_body["rooms"]
)
# We now send down some state in room1
resp = self.helper.send_state(
room_id1, EventTypes.Name, {"name": "foo"}, tok=user1_tok
)
name_change_id = resp["event_id"]
# A second messages happens in the other room, so room1 won't get sent down.
self.helper.send(room_id2, "msg", tok=user1_tok)
# Only the second room gets sent down sync.
response_body, from_token = self.do_sync(
sync_body, since=initial_from_token, tok=user1_tok
)
self.assertCountEqual(
response_body["rooms"].keys(), {room_id2}, response_body["rooms"]
)
# FIXME: This is a hack to record that the first room wasn't sent down
# sync, as we don't implement that currently.
sliding_sync_handler = self.hs.get_sliding_sync_handler()
requester = self.get_success(
self.hs.get_auth().get_user_by_access_token(user1_tok)
)
sync_config = SlidingSyncConfig(
user=requester.user,
requester=requester,
conn_id=conn_id,
)
parsed_initial_from_token = self.get_success(
SlidingSyncStreamToken.from_string(self.store, initial_from_token)
)
connection_position = self.get_success(
sliding_sync_handler.connection_store.record_rooms(
sync_config,
parsed_initial_from_token,
sent_room_ids=[],
unsent_room_ids=[room_id1],
)
)
# FIXME: Now fix up `from_token` with new connect position above.
parsed_from_token = self.get_success(
SlidingSyncStreamToken.from_string(self.store, from_token)
)
parsed_from_token = SlidingSyncStreamToken(
stream_token=parsed_from_token.stream_token,
connection_position=connection_position,
)
from_token = self.get_success(parsed_from_token.to_string(self.store))
# We now send another event to room1, so we should sync all the missing state.
self.helper.send(room_id1, "msg", tok=user1_tok)
# This sync should contain the state changes from room1.
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
self.assertCountEqual(
response_body["rooms"].keys(), {room_id1}, response_body["rooms"]
)
self.assertNotIn("initial", response_body["rooms"][room_id1])
# We should only see the name change.
self.assertEqual(
[
ev["event_id"]
for ev in response_body["rooms"][room_id1]["required_state"]
],
[name_change_id],
)
def test_rooms_required_state_incremental_sync_NEVER(self) -> None:
"""
Test getting `required_state` where we have NEVER sent down the room before
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
self.helper.send(room_id1, "msg", tok=user1_tok)
sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 0]],
"required_state": [
[EventTypes.Create, ""],
[EventTypes.RoomHistoryVisibility, ""],
# This one doesn't exist in the room
[EventTypes.Name, ""],
],
"timeline_limit": 1,
}
},
}
# A message happens in the other room, so room1 won't get sent down.
self.helper.send(room_id2, "msg", tok=user1_tok)
# Only the second room gets sent down sync.
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
self.assertCountEqual(
response_body["rooms"].keys(), {room_id2}, response_body["rooms"]
)
# We now send another event to room1, so we should send down the full
# room.
self.helper.send(room_id1, "msg2", tok=user1_tok)
# This sync should contain the messages from room1 not yet sent down.
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
self.assertCountEqual(
response_body["rooms"].keys(), {room_id1}, response_body["rooms"]
)
self.assertEqual(response_body["rooms"][room_id1]["initial"], True)
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id1)
)
self._assertRequiredStateIncludes(
response_body["rooms"][room_id1]["required_state"],
{
state_map[(EventTypes.Create, "")],
state_map[(EventTypes.RoomHistoryVisibility, "")],
},
exact=True,
)
def test_rooms_timeline_incremental_sync_NEVER(self) -> None:
"""
Test getting timeline room data where we have NEVER sent down the room
before
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 0]],
"required_state": [],
"timeline_limit": 5,
}
},
}
expected_events = []
for _ in range(4):
resp = self.helper.send(room_id1, "msg", tok=user1_tok)
expected_events.append(resp["event_id"])
# A message happens in the other room, so room1 won't get sent down.
self.helper.send(room_id2, "msg", tok=user1_tok)
# Only the second room gets sent down sync.
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
self.assertCountEqual(
response_body["rooms"].keys(), {room_id2}, response_body["rooms"]
)
# We now send another event to room1 so it comes down sync
resp = self.helper.send(room_id1, "msg2", tok=user1_tok)
expected_events.append(resp["event_id"])
# This sync should contain the messages from room1 not yet sent down.
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
self.assertCountEqual(
response_body["rooms"].keys(), {room_id1}, response_body["rooms"]
)
self.assertEqual(
[ev["event_id"] for ev in response_body["rooms"][room_id1]["timeline"]],
expected_events,
)
self.assertEqual(response_body["rooms"][room_id1]["limited"], True)
self.assertEqual(response_body["rooms"][room_id1]["initial"], True)
class SlidingSyncToDeviceExtensionTestCase(SlidingSyncBase): class SlidingSyncToDeviceExtensionTestCase(SlidingSyncBase):
"""Tests for the to-device sliding sync extension""" """Tests for the to-device sliding sync extension"""