Fix bug in /sync response for archived rooms (#16932)

This PR fixes a very, very niche edge-case, but I've got some more work
coming which will otherwise make the problem worse.

The bug happens when the syncing user leaves a room, and has a sync
filter which includes "left" rooms, but sets the timeline limit to 0. In
that case, the state returned in the `state` section is calculated
incorrectly.

The fix is to pass a token corresponding to the point that the user
leaves the room through to `compute_state_delta`.
This commit is contained in:
Richard van der Hoff 2024-04-04 12:47:59 +01:00 committed by GitHub
parent 31122b71bc
commit 05957ac70f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 314 additions and 34 deletions

1
changelog.d/16932.bugfix Normal file
View File

@ -0,0 +1 @@
Fix a long-standing bug which could cause incorrect state to be returned from `/sync` for rooms where the user has left.

View File

@ -953,7 +953,7 @@ class SyncHandler:
batch: TimelineBatch,
sync_config: SyncConfig,
since_token: Optional[StreamToken],
now_token: StreamToken,
end_token: StreamToken,
full_state: bool,
) -> MutableStateMap[EventBase]:
"""Works out the difference in state between the end of the previous sync and
@ -964,7 +964,9 @@ class SyncHandler:
batch: The timeline batch for the room that will be sent to the user.
sync_config:
since_token: Token of the end of the previous batch. May be `None`.
now_token: Token of the end of the current batch.
end_token: Token of the end of the current batch. Normally this will be
the same as the global "now_token", but if the user has left the room,
the point just after their leave event.
full_state: Whether to force returning the full state.
`lazy_load_members` still applies when `full_state` is `True`.
@ -1044,7 +1046,7 @@ class SyncHandler:
room_id,
sync_config.user,
batch,
now_token,
end_token,
members_to_fetch,
timeline_state,
)
@ -1058,7 +1060,7 @@ class SyncHandler:
room_id,
batch,
since_token,
now_token,
end_token,
members_to_fetch,
timeline_state,
)
@ -1130,7 +1132,7 @@ class SyncHandler:
room_id: str,
syncing_user: UserID,
batch: TimelineBatch,
now_token: StreamToken,
end_token: StreamToken,
members_to_fetch: Optional[Set[str]],
timeline_state: StateMap[str],
) -> StateMap[str]:
@ -1143,7 +1145,9 @@ class SyncHandler:
room_id: The room we are calculating for.
syncing_user: The user that is calling `/sync`.
batch: The timeline batch for the room that will be sent to the user.
now_token: Token of the end of the current batch.
end_token: Token of the end of the current batch. Normally this will be
the same as the global "now_token", but if the user has left the room,
the point just after their leave event.
members_to_fetch: If lazy-loading is enabled, the memberships needed for
events in the timeline.
timeline_state: The contribution to the room state from state events in
@ -1202,7 +1206,7 @@ class SyncHandler:
else:
state_at_timeline_end = await self.get_state_at(
room_id,
stream_position=now_token,
stream_position=end_token,
state_filter=state_filter,
await_full_state=await_full_state,
)
@ -1223,7 +1227,7 @@ class SyncHandler:
room_id: str,
batch: TimelineBatch,
since_token: StreamToken,
now_token: StreamToken,
end_token: StreamToken,
members_to_fetch: Optional[Set[str]],
timeline_state: StateMap[str],
) -> StateMap[str]:
@ -1239,7 +1243,9 @@ class SyncHandler:
room_id: The room we are calculating for.
batch: The timeline batch for the room that will be sent to the user.
since_token: Token of the end of the previous batch.
now_token: Token of the end of the current batch.
end_token: Token of the end of the current batch. Normally this will be
the same as the global "now_token", but if the user has left the room,
the point just after their leave event.
members_to_fetch: If lazy-loading is enabled, the memberships needed for
events in the timeline. Otherwise, `None`.
timeline_state: The contribution to the room state from state events in
@ -1273,7 +1279,7 @@ class SyncHandler:
# the recent events.
state_at_timeline_start = await self.get_state_at(
room_id,
stream_position=now_token,
stream_position=end_token,
state_filter=state_filter,
await_full_state=await_full_state,
)
@ -1312,7 +1318,7 @@ class SyncHandler:
# the recent events.
state_at_timeline_end = await self.get_state_at(
room_id,
stream_position=now_token,
stream_position=end_token,
state_filter=state_filter,
await_full_state=await_full_state,
)
@ -2344,6 +2350,7 @@ class SyncHandler:
full_state=False,
since_token=since_token,
upto_token=leave_token,
end_token=leave_token,
out_of_band=leave_event.internal_metadata.is_out_of_band_membership(),
)
)
@ -2381,6 +2388,7 @@ class SyncHandler:
full_state=False,
since_token=None if newly_joined else since_token,
upto_token=prev_batch_token,
end_token=now_token,
)
else:
entry = RoomSyncResultBuilder(
@ -2391,6 +2399,7 @@ class SyncHandler:
full_state=False,
since_token=since_token,
upto_token=since_token,
end_token=now_token,
)
room_entries.append(entry)
@ -2449,6 +2458,7 @@ class SyncHandler:
full_state=True,
since_token=since_token,
upto_token=now_token,
end_token=now_token,
)
)
elif event.membership == Membership.INVITE:
@ -2478,6 +2488,7 @@ class SyncHandler:
full_state=True,
since_token=since_token,
upto_token=leave_token,
end_token=leave_token,
)
)
@ -2548,6 +2559,7 @@ class SyncHandler:
{
"since_token": since_token,
"upto_token": upto_token,
"end_token": room_builder.end_token,
}
)
@ -2621,7 +2633,7 @@ class SyncHandler:
batch,
sync_config,
since_token,
now_token,
room_builder.end_token,
full_state=full_state,
)
else:
@ -2781,6 +2793,70 @@ def _calculate_state(
e for t, e in timeline_start.items() if t[0] == EventTypes.Member
)
# Naively, we would just return the difference between the state at the start
# of the timeline (`timeline_start_ids`) and that at the end of the previous sync
# (`previous_timeline_end_ids`). However, that fails in the presence of forks in
# the DAG.
#
# For example, consider a DAG such as the following:
#
# E1
# ↗ ↖
# | S2
# | ↑
# --|------|----
# | |
# E3 |
# ↖ /
# E4
#
# ... and a filter that means we only return 2 events, represented by the dashed
# horizontal line. Assuming S2 was *not* included in the previous sync, we need to
# include it in the `state` section.
#
# Note that the state at the start of the timeline (E3) does not include S2. So,
# to make sure it gets included in the calculation here, we actually look at
# the state at the *end* of the timeline, and subtract any events that are present
# in the timeline.
#
# ----------
#
# Aside 1: You may then wonder if we need to include `timeline_start` in the
# calculation. Consider a linear DAG:
#
# E1
# ↑
# S2
# ↑
# ----|------
# |
# E3
# ↑
# S4
# ↑
# E5
#
# ... where S2 and S4 change the same piece of state; and where we have a filter
# that returns 3 events (E3, S4, E5). We still need to tell the client about S2,
# because it might affect the display of E3. However, the state at the end of the
# timeline only tells us about S4; if we don't inspect `timeline_start` we won't
# find out about S2.
#
# (There are yet more complicated cases in which a state event is excluded from the
# timeline, but whose effect actually lands in the DAG in the *middle* of the
# timeline. We have no way to represent that in the /sync response, and we don't
# even try; it is ether omitted or plonked into `state` as if it were at the start
# of the timeline, depending on what else is in the timeline.)
#
# ----------
#
# Aside 2: it's worth noting that `timeline_end`, as provided to us, is actually
# the state *before* the final event in the timeline. In other words: if the final
# event in the timeline is a state event, it won't be included in `timeline_end`.
# However, that doesn't matter here, because the only difference can be in that
# one piece of state, and by definition that event is in the timeline, so we
# don't need to include it in the `state` section.
state_ids = (
(timeline_end_ids | timeline_start_ids)
- previous_timeline_end_ids
@ -2883,13 +2959,30 @@ class RoomSyncResultBuilder:
Attributes:
room_id
rtype: One of `"joined"` or `"archived"`
events: List of events to include in the room (more events may be added
when generating result).
newly_joined: If the user has newly joined the room
full_state: Whether the full state should be sent in result
since_token: Earliest point to return events from, or None
upto_token: Latest point to return events from.
upto_token: Latest point to return events from. If `events` is populated,
this is set to the token at the start of `events`
end_token: The last point in the timeline that the client should see events
from. Normally this will be the same as the global `now_token`, but in
the case of rooms where the user has left the room, this will be the point
just after their leave event.
This is used in the calculation of the state which is returned in `state`:
any state changes *up to* `end_token` (and not beyond!) which are not
reflected in the timeline need to be returned in `state`.
out_of_band: whether the events in the room are "out of band" events
and the server isn't in the room.
"""
@ -2901,5 +2994,5 @@ class RoomSyncResultBuilder:
full_state: bool
since_token: Optional[StreamToken]
upto_token: StreamToken
end_token: StreamToken
out_of_band: bool = False

View File

@ -17,14 +17,16 @@
# [This file includes modifications made by New Vector Limited]
#
#
from typing import Collection, List, Optional
from typing import Collection, ContextManager, List, Optional
from unittest.mock import AsyncMock, Mock, patch
from parameterized import parameterized
from twisted.test.proto_helpers import MemoryReactor
from synapse.api.constants import EventTypes, JoinRules
from synapse.api.errors import Codes, ResourceLimitError
from synapse.api.filtering import Filtering
from synapse.api.filtering import FilterCollection, Filtering
from synapse.api.room_versions import RoomVersion, RoomVersions
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
@ -33,7 +35,7 @@ from synapse.handlers.sync import SyncConfig, SyncResult
from synapse.rest import admin
from synapse.rest.client import knock, login, room
from synapse.server import HomeServer
from synapse.types import UserID, create_requester
from synapse.types import JsonDict, UserID, create_requester
from synapse.util import Clock
import tests.unittest
@ -258,13 +260,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
# Eve tries to join the room. We monkey patch the internal logic which selects
# the prev_events used when creating the join event, such that the ban does not
# precede the join.
mocked_get_prev_events = patch.object(
self.hs.get_datastores().main,
"get_prev_events_for_room",
new_callable=AsyncMock,
return_value=[last_room_creation_event_id],
)
with mocked_get_prev_events:
with self._patch_get_latest_events([last_room_creation_event_id]):
self.helper.join(room_id, eve, tok=eve_token)
# Eve makes a second, incremental sync.
@ -288,6 +284,180 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
)
self.assertEqual(eve_initial_sync_after_join.joined, [])
def test_state_includes_changes_on_forks(self) -> None:
"""State changes that happen on a fork of the DAG must be included in `state`
Given the following DAG:
E1
| S2
|
--|------|----
| |
E3 |
/
E4
... and a filter that means we only return 2 events, represented by the dashed
horizontal line: `S2` must be included in the `state` section.
"""
alice = self.register_user("alice", "password")
alice_tok = self.login(alice, "password")
alice_requester = create_requester(alice)
room_id = self.helper.create_room_as(alice, is_public=True, tok=alice_tok)
# Do an initial sync as Alice to get a known starting point.
initial_sync_result = self.get_success(
self.sync_handler.wait_for_sync_for_user(
alice_requester, generate_sync_config(alice)
)
)
last_room_creation_event_id = (
initial_sync_result.joined[0].timeline.events[-1].event_id
)
# Send a state event, and a regular event, both using the same prev ID
with self._patch_get_latest_events([last_room_creation_event_id]):
s2_event = self.helper.send_state(room_id, "s2", {}, tok=alice_tok)[
"event_id"
]
e3_event = self.helper.send(room_id, "e3", tok=alice_tok)["event_id"]
# Send a final event, joining the two branches of the dag
e4_event = self.helper.send(room_id, "e4", tok=alice_tok)["event_id"]
# do an incremental sync, with a filter that will ensure we only get two of
# the three new events.
incremental_sync = self.get_success(
self.sync_handler.wait_for_sync_for_user(
alice_requester,
generate_sync_config(
alice,
filter_collection=FilterCollection(
self.hs, {"room": {"timeline": {"limit": 2}}}
),
),
since_token=initial_sync_result.next_batch,
)
)
# The state event should appear in the 'state' section of the response.
room_sync = incremental_sync.joined[0]
self.assertEqual(room_sync.room_id, room_id)
self.assertTrue(room_sync.timeline.limited)
self.assertEqual(
[e.event_id for e in room_sync.timeline.events],
[e3_event, e4_event],
)
self.assertEqual(
[e.event_id for e in room_sync.state.values()],
[s2_event],
)
@parameterized.expand(
[
(False, False),
(True, False),
(False, True),
(True, True),
]
)
def test_archived_rooms_do_not_include_state_after_leave(
self, initial_sync: bool, empty_timeline: bool
) -> None:
"""If the user leaves the room, state changes that happen after they leave are not returned.
We try with both a zero and a normal timeline limit,
and we try both an initial sync and an incremental sync for both.
"""
if empty_timeline and not initial_sync:
# FIXME synapse doesn't return the room at all in this situation!
self.skipTest("Synapse does not correctly handle this case")
# Alice creates the room, and bob joins.
alice = self.register_user("alice", "password")
alice_tok = self.login(alice, "password")
bob = self.register_user("bob", "password")
bob_tok = self.login(bob, "password")
bob_requester = create_requester(bob)
room_id = self.helper.create_room_as(alice, is_public=True, tok=alice_tok)
self.helper.join(room_id, bob, tok=bob_tok)
initial_sync_result = self.get_success(
self.sync_handler.wait_for_sync_for_user(
bob_requester, generate_sync_config(bob)
)
)
# Alice sends a message and a state
before_message_event = self.helper.send(room_id, "before", tok=alice_tok)[
"event_id"
]
before_state_event = self.helper.send_state(
room_id, "test_state", {"body": "before"}, tok=alice_tok
)["event_id"]
# Bob leaves
leave_event = self.helper.leave(room_id, bob, tok=bob_tok)["event_id"]
# Alice sends some more stuff
self.helper.send(room_id, "after", tok=alice_tok)["event_id"]
self.helper.send_state(room_id, "test_state", {"body": "after"}, tok=alice_tok)[
"event_id"
]
# And now, Bob resyncs.
filter_dict: JsonDict = {"room": {"include_leave": True}}
if empty_timeline:
filter_dict["room"]["timeline"] = {"limit": 0}
sync_room_result = self.get_success(
self.sync_handler.wait_for_sync_for_user(
bob_requester,
generate_sync_config(
bob, filter_collection=FilterCollection(self.hs, filter_dict)
),
since_token=None if initial_sync else initial_sync_result.next_batch,
)
).archived[0]
if empty_timeline:
# The timeline should be empty
self.assertEqual(sync_room_result.timeline.events, [])
# And the state should include the leave event...
self.assertEqual(
sync_room_result.state[("m.room.member", bob)].event_id, leave_event
)
# ... and the state change before he left.
self.assertEqual(
sync_room_result.state[("test_state", "")].event_id, before_state_event
)
else:
# The last three events in the timeline should be those leading up to the
# leave
self.assertEqual(
[e.event_id for e in sync_room_result.timeline.events[-3:]],
[before_message_event, before_state_event, leave_event],
)
# ... And the state should be empty
self.assertEqual(sync_room_result.state, {})
def _patch_get_latest_events(self, latest_events: List[str]) -> ContextManager:
"""Monkey-patch `get_prev_events_for_room`
Returns a context manager which will replace the implementation of
`get_prev_events_for_room` with one which returns `latest_events`.
"""
return patch.object(
self.hs.get_datastores().main,
"get_prev_events_for_room",
new_callable=AsyncMock,
return_value=latest_events,
)
def test_call_invite_in_public_room_not_returned(self) -> None:
user = self.register_user("alice", "password")
tok = self.login(user, "password")
@ -401,14 +571,26 @@ _request_key = 0
def generate_sync_config(
user_id: str, device_id: Optional[str] = "device_id"
user_id: str,
device_id: Optional[str] = "device_id",
filter_collection: Optional[FilterCollection] = None,
) -> SyncConfig:
"""Generate a sync config (with a unique request key)."""
"""Generate a sync config (with a unique request key).
Args:
user_id: user who is syncing.
device_id: device that is syncing. Defaults to "device_id".
filter_collection: filter to apply. Defaults to the default filter (ie,
return everything, with a default limit)
"""
if filter_collection is None:
filter_collection = Filtering(Mock()).DEFAULT_FILTER_COLLECTION
global _request_key
_request_key += 1
return SyncConfig(
user=UserID.from_string(user_id),
filter_collection=Filtering(Mock()).DEFAULT_FILTER_COLLECTION,
filter_collection=filter_collection,
is_guest=False,
request_key=("request_key", _request_key),
device_id=device_id,

View File

@ -170,8 +170,8 @@ class RestHelper:
targ: Optional[str] = None,
expect_code: int = HTTPStatus.OK,
tok: Optional[str] = None,
) -> None:
self.change_membership(
) -> JsonDict:
return self.change_membership(
room=room,
src=src,
targ=targ,
@ -189,8 +189,8 @@ class RestHelper:
appservice_user_id: Optional[str] = None,
expect_errcode: Optional[Codes] = None,
expect_additional_fields: Optional[dict] = None,
) -> None:
self.change_membership(
) -> JsonDict:
return self.change_membership(
room=room,
src=user,
targ=user,
@ -242,8 +242,8 @@ class RestHelper:
user: Optional[str] = None,
expect_code: int = HTTPStatus.OK,
tok: Optional[str] = None,
) -> None:
self.change_membership(
) -> JsonDict:
return self.change_membership(
room=room,
src=user,
targ=user,
@ -282,7 +282,7 @@ class RestHelper:
expect_code: int = HTTPStatus.OK,
expect_errcode: Optional[str] = None,
expect_additional_fields: Optional[dict] = None,
) -> None:
) -> JsonDict:
"""
Send a membership state event into a room.
@ -298,6 +298,9 @@ class RestHelper:
using an application service access token in `tok`.
expect_code: The expected HTTP response code
expect_errcode: The expected Matrix error code
Returns:
The JSON response
"""
temp_id = self.auth_user_id
self.auth_user_id = src
@ -356,6 +359,7 @@ class RestHelper:
)
self.auth_user_id = temp_id
return channel.json_body
def send(
self,