diff --git a/changelog.d/15248.bugfix b/changelog.d/15248.bugfix new file mode 100644 index 000000000..8665acb49 --- /dev/null +++ b/changelog.d/15248.bugfix @@ -0,0 +1 @@ +Fix a rare bug introduced in Synapse 1.73 where events could remain unsent to other homeservers after a faster-join to a room. diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index 478187ce4..31c5c2b7d 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -497,8 +497,8 @@ class PerDestinationQueue: # # Note: `catchup_pdus` will have exactly one PDU per room. for pdu in catchup_pdus: - # The PDU from the DB will be the last PDU in the room from - # *this server* that wasn't sent to the remote. However, other + # The PDU from the DB will be the newest PDU in the room from + # *this server* that we tried---but were unable---to send to the remote. # servers may have sent lots of events since then, and we want # to try and tell the remote only about the *latest* events in # the room. This is so that it doesn't get inundated by events @@ -516,6 +516,11 @@ class PerDestinationQueue: # If the event is in the extremities, then great! We can just # use that without having to do further checks. room_catchup_pdus = [pdu] + elif await self._store.is_partial_state_room(pdu.room_id): + # We can't be sure which events the destination should + # see using only partial state. Avoid doing so, and just retry + # sending our the newest PDU the remote is missing from us. + room_catchup_pdus = [pdu] else: # If not, fetch the extremities and figure out which we can # send. diff --git a/tests/federation/test_federation_catch_up.py b/tests/federation/test_federation_catch_up.py index 6381583c2..391ae5170 100644 --- a/tests/federation/test_federation_catch_up.py +++ b/tests/federation/test_federation_catch_up.py @@ -1,4 +1,5 @@ -from typing import Callable, List, Optional, Tuple +from typing import Callable, Collection, List, Optional, Tuple +from unittest import mock from unittest.mock import Mock from twisted.test.proto_helpers import MemoryReactor @@ -500,3 +501,87 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase): self.assertEqual(len(sent_pdus), 1) self.assertEqual(sent_pdus[0].event_id, event_2.event_id) self.assertFalse(per_dest_queue._catching_up) + + def test_catch_up_is_not_blocked_by_remote_event_in_partial_state_room( + self, + ) -> None: + """Detects (part of?) https://github.com/matrix-org/synapse/issues/15220.""" + # ARRANGE: + # - a local user (u1) + # - a room which contains u1 and two remote users, @u2:host2 and @u3:other + # - events in that room such that + # - history visibility is restricted + # - u1 sent message events e1 and e2 + # - afterwards, u3 sent a remote event e3 + # - catchup to begin for host2; last successfully sent event was e1 + per_dest_queue, sent_pdus = self.make_fake_destination_queue() + + self.register_user("u1", "you the one") + u1_token = self.login("u1", "you the one") + room = self.helper.create_room_as("u1", tok=u1_token) + self.helper.send_state( + room_id=room, + event_type="m.room.history_visibility", + body={"history_visibility": "joined"}, + tok=u1_token, + ) + self.get_success( + event_injection.inject_member_event(self.hs, room, "@u2:host2", "join") + ) + self.get_success( + event_injection.inject_member_event(self.hs, room, "@u3:other", "join") + ) + + # create some events + event_id_1 = self.helper.send(room, "hello", tok=u1_token)["event_id"] + event_id_2 = self.helper.send(room, "world", tok=u1_token)["event_id"] + # pretend that u3 changes their displayname + event_id_3 = self.get_success( + event_injection.inject_member_event(self.hs, room, "@u3:other", "join") + ).event_id + + # destination_rooms should already be populated, but let us pretend that we already + # sent (successfully) up to and including event id 1 + event_1 = self.get_success(self.hs.get_datastores().main.get_event(event_id_1)) + assert event_1.internal_metadata.stream_ordering is not None + self.get_success( + self.hs.get_datastores().main.set_destination_last_successful_stream_ordering( + "host2", event_1.internal_metadata.stream_ordering + ) + ) + + # also fetch event 2 so we can compare its stream ordering to the sender's + # last_successful_stream_ordering later + event_2 = self.get_success(self.hs.get_datastores().main.get_event(event_id_2)) + + # Mock event 3 as having partial state + self.get_success( + event_injection.mark_event_as_partial_state(self.hs, event_id_3, room) + ) + + # Fail the test if we block on full state for event 3. + async def mock_await_full_state(event_ids: Collection[str]) -> None: + if event_id_3 in event_ids: + raise AssertionError("Tried to await full state for event_id_3") + + # ACT + with mock.patch.object( + self.hs.get_storage_controllers().state._partial_state_events_tracker, + "await_full_state", + mock_await_full_state, + ): + self.get_success(per_dest_queue._catch_up_transmission_loop()) + + # ASSERT + # We should have: + # - not sent event 3: it's not ours, and the room is partial stated + # - fallen back to sending event 2: it's the most recent event in the room + # we tried to send to host2 + # - completed catch-up + self.assertEqual(len(sent_pdus), 1) + self.assertEqual(sent_pdus[0].event_id, event_id_2) + self.assertFalse(per_dest_queue._catching_up) + self.assertEqual( + per_dest_queue._last_successful_stream_ordering, + event_2.internal_metadata.stream_ordering, + ) diff --git a/tests/test_utils/event_injection.py b/tests/test_utils/event_injection.py index a6330ed84..9679904c3 100644 --- a/tests/test_utils/event_injection.py +++ b/tests/test_utils/event_injection.py @@ -102,3 +102,34 @@ async def create_event( context = await unpersisted_context.persist(event) return event, context + + +async def mark_event_as_partial_state( + hs: synapse.server.HomeServer, + event_id: str, + room_id: str, +) -> None: + """ + (Falsely) mark an event as having partial state. + + Naughty, but occasionally useful when checking that partial state doesn't + block something from happening. + + If the event already has partial state, this insert will fail (event_id is unique + in this table). + """ + store = hs.get_datastores().main + await store.db_pool.simple_upsert( + table="partial_state_rooms", + keyvalues={"room_id": room_id}, + values={}, + insertion_values={"room_id": room_id}, + ) + + await store.db_pool.simple_insert( + table="partial_state_events", + values={ + "room_id": room_id, + "event_id": event_id, + }, + )