mirror of
https://mau.dev/maunium/synapse.git
synced 2024-10-01 01:36:05 -04:00
Also check if first event matches the last in prev batch (#17066)
Refinement of #17064 cc @richvdh
This commit is contained in:
parent
4ffed6330f
commit
89f1092284
1
changelog.d/17066.bugfix
Normal file
1
changelog.d/17066.bugfix
Normal file
@ -0,0 +1 @@
|
|||||||
|
Fix various long-standing bugs which could cause incorrect state to be returned from `/sync` in certain situations.
|
@ -1266,17 +1266,23 @@ class SyncHandler:
|
|||||||
#
|
#
|
||||||
# c.f. #16941 for an example of why we can't do this for all non-gappy
|
# c.f. #16941 for an example of why we can't do this for all non-gappy
|
||||||
# syncs.
|
# syncs.
|
||||||
is_linear_timeline = False
|
is_linear_timeline = True
|
||||||
if batch.events:
|
if batch.events:
|
||||||
prev_event_id = batch.events[0].event_id
|
# We need to make sure the first event in our batch points to the
|
||||||
for e in batch.events[1:]:
|
# last event in the previous batch.
|
||||||
|
last_event_id_prev_batch = (
|
||||||
|
await self.store.get_last_event_in_room_before_stream_ordering(
|
||||||
|
room_id,
|
||||||
|
end_token=since_token.room_key,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
prev_event_id = last_event_id_prev_batch
|
||||||
|
for e in batch.events:
|
||||||
if e.prev_event_ids() != [prev_event_id]:
|
if e.prev_event_ids() != [prev_event_id]:
|
||||||
|
is_linear_timeline = False
|
||||||
break
|
break
|
||||||
prev_event_id = e.event_id
|
prev_event_id = e.event_id
|
||||||
else:
|
|
||||||
is_linear_timeline = True
|
|
||||||
else:
|
|
||||||
is_linear_timeline = True
|
|
||||||
|
|
||||||
if is_linear_timeline and not batch.limited:
|
if is_linear_timeline and not batch.limited:
|
||||||
state_ids: StateMap[str] = {}
|
state_ids: StateMap[str] = {}
|
||||||
|
@ -435,6 +435,101 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
|
|||||||
[s2_event],
|
[s2_event],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def test_state_includes_changes_on_long_lived_forks(self) -> None:
|
||||||
|
"""State changes that happen on a fork of the DAG must be included in `state`
|
||||||
|
|
||||||
|
Given the following DAG:
|
||||||
|
|
||||||
|
E1
|
||||||
|
↗ ↖
|
||||||
|
| S2
|
||||||
|
| ↑
|
||||||
|
--|------|----
|
||||||
|
E3 |
|
||||||
|
--|------|----
|
||||||
|
| E4
|
||||||
|
| |
|
||||||
|
|
||||||
|
... and a filter that means we only return 1 event, represented by the dashed
|
||||||
|
horizontal lines: `S2` must be included in the `state` section on the second sync.
|
||||||
|
"""
|
||||||
|
alice = self.register_user("alice", "password")
|
||||||
|
alice_tok = self.login(alice, "password")
|
||||||
|
alice_requester = create_requester(alice)
|
||||||
|
room_id = self.helper.create_room_as(alice, is_public=True, tok=alice_tok)
|
||||||
|
|
||||||
|
# Do an initial sync as Alice to get a known starting point.
|
||||||
|
initial_sync_result = self.get_success(
|
||||||
|
self.sync_handler.wait_for_sync_for_user(
|
||||||
|
alice_requester, generate_sync_config(alice)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
last_room_creation_event_id = (
|
||||||
|
initial_sync_result.joined[0].timeline.events[-1].event_id
|
||||||
|
)
|
||||||
|
|
||||||
|
# Send a state event, and a regular event, both using the same prev ID
|
||||||
|
with self._patch_get_latest_events([last_room_creation_event_id]):
|
||||||
|
s2_event = self.helper.send_state(room_id, "s2", {}, tok=alice_tok)[
|
||||||
|
"event_id"
|
||||||
|
]
|
||||||
|
e3_event = self.helper.send(room_id, "e3", tok=alice_tok)["event_id"]
|
||||||
|
|
||||||
|
# Do an incremental sync, this will return E3 but *not* S2 at this
|
||||||
|
# point.
|
||||||
|
incremental_sync = self.get_success(
|
||||||
|
self.sync_handler.wait_for_sync_for_user(
|
||||||
|
alice_requester,
|
||||||
|
generate_sync_config(
|
||||||
|
alice,
|
||||||
|
filter_collection=FilterCollection(
|
||||||
|
self.hs, {"room": {"timeline": {"limit": 1}}}
|
||||||
|
),
|
||||||
|
),
|
||||||
|
since_token=initial_sync_result.next_batch,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
room_sync = incremental_sync.joined[0]
|
||||||
|
self.assertEqual(room_sync.room_id, room_id)
|
||||||
|
self.assertTrue(room_sync.timeline.limited)
|
||||||
|
self.assertEqual(
|
||||||
|
[e.event_id for e in room_sync.timeline.events],
|
||||||
|
[e3_event],
|
||||||
|
)
|
||||||
|
self.assertEqual(
|
||||||
|
[e.event_id for e in room_sync.state.values()],
|
||||||
|
[],
|
||||||
|
)
|
||||||
|
|
||||||
|
# Now send another event that points to S2, but not E3.
|
||||||
|
with self._patch_get_latest_events([s2_event]):
|
||||||
|
e4_event = self.helper.send(room_id, "e4", tok=alice_tok)["event_id"]
|
||||||
|
|
||||||
|
# Doing an incremental sync should return S2 in state.
|
||||||
|
incremental_sync = self.get_success(
|
||||||
|
self.sync_handler.wait_for_sync_for_user(
|
||||||
|
alice_requester,
|
||||||
|
generate_sync_config(
|
||||||
|
alice,
|
||||||
|
filter_collection=FilterCollection(
|
||||||
|
self.hs, {"room": {"timeline": {"limit": 1}}}
|
||||||
|
),
|
||||||
|
),
|
||||||
|
since_token=incremental_sync.next_batch,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
room_sync = incremental_sync.joined[0]
|
||||||
|
self.assertEqual(room_sync.room_id, room_id)
|
||||||
|
self.assertFalse(room_sync.timeline.limited)
|
||||||
|
self.assertEqual(
|
||||||
|
[e.event_id for e in room_sync.timeline.events],
|
||||||
|
[e4_event],
|
||||||
|
)
|
||||||
|
self.assertEqual(
|
||||||
|
[e.event_id for e in room_sync.state.values()],
|
||||||
|
[s2_event],
|
||||||
|
)
|
||||||
|
|
||||||
def test_state_includes_changes_on_ungappy_syncs(self) -> None:
|
def test_state_includes_changes_on_ungappy_syncs(self) -> None:
|
||||||
"""Test `state` where the sync is not gappy.
|
"""Test `state` where the sync is not gappy.
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user