mirror of
https://mau.dev/maunium/synapse.git
synced 2024-10-01 01:36:05 -04:00
Fix bug in calculating state for non-gappy syncs (#16942)
Unfortunately, the optimisation we applied here for non-gappy syncs is not actually valid. Fixes https://github.com/element-hq/synapse/issues/16941. ~~Based on https://github.com/element-hq/synapse/pull/16930.~~ Requires https://github.com/matrix-org/sytest/pull/1374.
This commit is contained in:
parent
230b709d9d
commit
0e68e9b7f4
@ -1 +1 @@
|
||||
Fix a long-standing bug which could cause state to be omitted from `/sync` responses when certain events are filtered out of the timeline.
|
||||
Fix various long-standing bugs which could cause incorrect state to be returned from `/sync` in certain situations.
|
||||
|
@ -1 +1 @@
|
||||
Fix a long-standing bug which could cause incorrect state to be returned from `/sync` for rooms where the user has left.
|
||||
Fix various long-standing bugs which could cause incorrect state to be returned from `/sync` in certain situations.
|
||||
|
1
changelog.d/16942.bugfix
Normal file
1
changelog.d/16942.bugfix
Normal file
@ -0,0 +1 @@
|
||||
Fix various long-standing bugs which could cause incorrect state to be returned from `/sync` in certain situations.
|
@ -1195,6 +1195,8 @@ class SyncHandler:
|
||||
)
|
||||
|
||||
if batch:
|
||||
# Strictly speaking, this returns the state *after* the first event in the
|
||||
# timeline, but that is good enough here.
|
||||
state_at_timeline_start = (
|
||||
await self._state_storage_controller.get_state_ids_for_event(
|
||||
batch.events[0].event_id,
|
||||
@ -1257,25 +1259,25 @@ class SyncHandler:
|
||||
await_full_state = True
|
||||
lazy_load_members = False
|
||||
|
||||
if batch.limited:
|
||||
if batch:
|
||||
state_at_timeline_start = (
|
||||
await self._state_storage_controller.get_state_ids_for_event(
|
||||
batch.events[0].event_id,
|
||||
state_filter=state_filter,
|
||||
await_full_state=await_full_state,
|
||||
)
|
||||
)
|
||||
else:
|
||||
# We can get here if the user has ignored the senders of all
|
||||
# the recent events.
|
||||
state_at_timeline_start = await self.get_state_at(
|
||||
room_id,
|
||||
stream_position=end_token,
|
||||
if batch:
|
||||
state_at_timeline_start = (
|
||||
await self._state_storage_controller.get_state_ids_for_event(
|
||||
batch.events[0].event_id,
|
||||
state_filter=state_filter,
|
||||
await_full_state=await_full_state,
|
||||
)
|
||||
)
|
||||
else:
|
||||
# We can get here if the user has ignored the senders of all
|
||||
# the recent events.
|
||||
state_at_timeline_start = await self.get_state_at(
|
||||
room_id,
|
||||
stream_position=end_token,
|
||||
state_filter=state_filter,
|
||||
await_full_state=await_full_state,
|
||||
)
|
||||
|
||||
if batch.limited:
|
||||
# for now, we disable LL for gappy syncs - see
|
||||
# https://github.com/vector-im/riot-web/issues/7211#issuecomment-419976346
|
||||
# N.B. this slows down incr syncs as we are now processing way
|
||||
@ -1290,47 +1292,28 @@ class SyncHandler:
|
||||
# about them).
|
||||
state_filter = StateFilter.all()
|
||||
|
||||
state_at_previous_sync = await self.get_state_at(
|
||||
room_id,
|
||||
stream_position=since_token,
|
||||
state_filter=state_filter,
|
||||
await_full_state=await_full_state,
|
||||
)
|
||||
state_at_previous_sync = await self.get_state_at(
|
||||
room_id,
|
||||
stream_position=since_token,
|
||||
state_filter=state_filter,
|
||||
await_full_state=await_full_state,
|
||||
)
|
||||
|
||||
state_at_timeline_end = await self.get_state_at(
|
||||
room_id,
|
||||
stream_position=end_token,
|
||||
state_filter=state_filter,
|
||||
await_full_state=await_full_state,
|
||||
)
|
||||
state_at_timeline_end = await self.get_state_at(
|
||||
room_id,
|
||||
stream_position=end_token,
|
||||
state_filter=state_filter,
|
||||
await_full_state=await_full_state,
|
||||
)
|
||||
|
||||
state_ids = _calculate_state(
|
||||
timeline_contains=timeline_state,
|
||||
timeline_start=state_at_timeline_start,
|
||||
timeline_end=state_at_timeline_end,
|
||||
previous_timeline_end=state_at_previous_sync,
|
||||
lazy_load_members=lazy_load_members,
|
||||
)
|
||||
else:
|
||||
state_ids = {}
|
||||
if lazy_load_members:
|
||||
if members_to_fetch and batch.events:
|
||||
# We're returning an incremental sync, with no
|
||||
# "gap" since the previous sync, so normally there would be
|
||||
# no state to return.
|
||||
# But we're lazy-loading, so the client might need some more
|
||||
# member events to understand the events in this timeline.
|
||||
# So we fish out all the member events corresponding to the
|
||||
# timeline here. The caller will then dedupe any redundant ones.
|
||||
state_ids = _calculate_state(
|
||||
timeline_contains=timeline_state,
|
||||
timeline_start=state_at_timeline_start,
|
||||
timeline_end=state_at_timeline_end,
|
||||
previous_timeline_end=state_at_previous_sync,
|
||||
lazy_load_members=lazy_load_members,
|
||||
)
|
||||
|
||||
state_ids = await self._state_storage_controller.get_state_ids_for_event(
|
||||
batch.events[0].event_id,
|
||||
# we only want members!
|
||||
state_filter=StateFilter.from_types(
|
||||
(EventTypes.Member, member) for member in members_to_fetch
|
||||
),
|
||||
await_full_state=False,
|
||||
)
|
||||
return state_ids
|
||||
|
||||
async def _find_missing_partial_state_memberships(
|
||||
|
@ -435,6 +435,111 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
|
||||
[s2_event],
|
||||
)
|
||||
|
||||
def test_state_includes_changes_on_ungappy_syncs(self) -> None:
|
||||
"""Test `state` where the sync is not gappy.
|
||||
|
||||
We start with a DAG like this:
|
||||
|
||||
E1
|
||||
↗ ↖
|
||||
| S2
|
||||
|
|
||||
--|---
|
||||
|
|
||||
E3
|
||||
|
||||
... and initialsync with `limit=1`, represented by the horizontal dashed line.
|
||||
At this point, we do not expect S2 to appear in the response at all (since
|
||||
it is excluded from the timeline by the `limit`, and the state is based on the
|
||||
state after the most recent event before the sync token (E3), which doesn't
|
||||
include S2.
|
||||
|
||||
Now more events arrive, and we do an incremental sync:
|
||||
|
||||
E1
|
||||
↗ ↖
|
||||
| S2
|
||||
| ↑
|
||||
E3 |
|
||||
↑ |
|
||||
--|------|----
|
||||
| |
|
||||
E4 |
|
||||
↖ /
|
||||
E5
|
||||
|
||||
This is the last chance for us to tell the client about S2, so it *must* be
|
||||
included in the response.
|
||||
"""
|
||||
alice = self.register_user("alice", "password")
|
||||
alice_tok = self.login(alice, "password")
|
||||
alice_requester = create_requester(alice)
|
||||
room_id = self.helper.create_room_as(alice, is_public=True, tok=alice_tok)
|
||||
|
||||
# Do an initial sync to get a known starting point.
|
||||
initial_sync_result = self.get_success(
|
||||
self.sync_handler.wait_for_sync_for_user(
|
||||
alice_requester, generate_sync_config(alice)
|
||||
)
|
||||
)
|
||||
last_room_creation_event_id = (
|
||||
initial_sync_result.joined[0].timeline.events[-1].event_id
|
||||
)
|
||||
|
||||
# Send a state event, and a regular event, both using the same prev ID
|
||||
with self._patch_get_latest_events([last_room_creation_event_id]):
|
||||
s2_event = self.helper.send_state(room_id, "s2", {}, tok=alice_tok)[
|
||||
"event_id"
|
||||
]
|
||||
e3_event = self.helper.send(room_id, "e3", tok=alice_tok)["event_id"]
|
||||
|
||||
# Another initial sync, with limit=1
|
||||
initial_sync_result = self.get_success(
|
||||
self.sync_handler.wait_for_sync_for_user(
|
||||
alice_requester,
|
||||
generate_sync_config(
|
||||
alice,
|
||||
filter_collection=FilterCollection(
|
||||
self.hs, {"room": {"timeline": {"limit": 1}}}
|
||||
),
|
||||
),
|
||||
)
|
||||
)
|
||||
room_sync = initial_sync_result.joined[0]
|
||||
self.assertEqual(room_sync.room_id, room_id)
|
||||
self.assertEqual(
|
||||
[e.event_id for e in room_sync.timeline.events],
|
||||
[e3_event],
|
||||
)
|
||||
self.assertNotIn(s2_event, [e.event_id for e in room_sync.state.values()])
|
||||
|
||||
# More events, E4 and E5
|
||||
with self._patch_get_latest_events([e3_event]):
|
||||
e4_event = self.helper.send(room_id, "e4", tok=alice_tok)["event_id"]
|
||||
e5_event = self.helper.send(room_id, "e5", tok=alice_tok)["event_id"]
|
||||
|
||||
# Now incremental sync
|
||||
incremental_sync = self.get_success(
|
||||
self.sync_handler.wait_for_sync_for_user(
|
||||
alice_requester,
|
||||
generate_sync_config(alice),
|
||||
since_token=initial_sync_result.next_batch,
|
||||
)
|
||||
)
|
||||
|
||||
# The state event should appear in the 'state' section of the response.
|
||||
room_sync = incremental_sync.joined[0]
|
||||
self.assertEqual(room_sync.room_id, room_id)
|
||||
self.assertFalse(room_sync.timeline.limited)
|
||||
self.assertEqual(
|
||||
[e.event_id for e in room_sync.timeline.events],
|
||||
[e4_event, e5_event],
|
||||
)
|
||||
self.assertEqual(
|
||||
[e.event_id for e in room_sync.state.values()],
|
||||
[s2_event],
|
||||
)
|
||||
|
||||
@parameterized.expand(
|
||||
[
|
||||
(False, False),
|
||||
|
Loading…
Reference in New Issue
Block a user