diff --git a/changelog.d/14474.misc b/changelog.d/14474.misc new file mode 100644 index 000000000..deccd4e91 --- /dev/null +++ b/changelog.d/14474.misc @@ -0,0 +1 @@ +Faster remote room joins: stream the un-partial-stating of rooms over replication. \ No newline at end of file diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 18252a295..b4dad47b4 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -36,12 +36,14 @@ from synapse.replication.tcp.streams import ( TagAccountDataStream, ToDeviceStream, TypingStream, + UnPartialStatedRoomStream, ) from synapse.replication.tcp.streams.events import ( EventsStream, EventsStreamEventRow, EventsStreamRow, ) +from synapse.replication.tcp.streams.partial_state import UnPartialStatedRoomStreamRow from synapse.types import PersistedEventPosition, ReadReceipt, StreamKeyType, UserID from synapse.util.async_helpers import Linearizer, timeout_deferred from synapse.util.metrics import Measure @@ -117,6 +119,7 @@ class ReplicationDataHandler: self._streams = hs.get_replication_streams() self._instance_name = hs.get_instance_name() self._typing_handler = hs.get_typing_handler() + self._state_storage_controller = hs.get_storage_controllers().state self._notify_pushers = hs.config.worker.start_pushers self._pusher_pool = hs.get_pusherpool() @@ -236,6 +239,14 @@ class ReplicationDataHandler: self.notifier.notify_user_joined_room( row.data.event_id, row.data.room_id ) + elif stream_name == UnPartialStatedRoomStream.NAME: + for row in rows: + assert isinstance(row, UnPartialStatedRoomStreamRow) + + # Wake up any tasks waiting for the room to be un-partial-stated. + self._state_storage_controller.notify_room_un_partial_stated( + row.room_id + ) await self._presence_handler.process_replication_rows( stream_name, instance_name, token, rows diff --git a/tests/replication/tcp/streams/test_partial_state.py b/tests/replication/tcp/streams/test_partial_state.py new file mode 100644 index 000000000..2c10eab4d --- /dev/null +++ b/tests/replication/tcp/streams/test_partial_state.py @@ -0,0 +1,65 @@ +# Copyright 2022 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from twisted.internet.defer import ensureDeferred + +from synapse.rest.client import room + +from tests.replication._base import BaseMultiWorkerStreamTestCase + + +class PartialStateStreamsTestCase(BaseMultiWorkerStreamTestCase): + servlets = [room.register_servlets] + hijack_auth = True + user_id = "@bob:test" + + def setUp(self): + super().setUp() + self.store = self.hs.get_datastores().main + + def test_un_partial_stated_room_unblocks_over_replication(self) -> None: + """ + Tests that, when a room is un-partial-stated on another worker, + pending calls to `await_full_state` get unblocked. + """ + + # Make a room. + room_id = self.helper.create_room_as("@bob:test") + # Mark the room as partial-stated. + self.get_success( + self.store.store_partial_state_room(room_id, ["serv1", "serv2"], 0, "serv1") + ) + + worker = self.make_worker_hs("synapse.app.generic_worker") + + # On the worker, attempt to get the current hosts in the room + d = ensureDeferred( + worker.get_storage_controllers().state.get_current_hosts_in_room(room_id) + ) + + self.reactor.advance(0.1) + + # This should block + self.assertFalse( + d.called, "get_current_hosts_in_room/await_full_state did not block" + ) + + # On the master, clear the partial state flag. + self.get_success(self.store.clear_partial_state_room(room_id)) + + self.reactor.advance(0.1) + + # The worker should have unblocked + self.assertTrue( + d.called, "get_current_hosts_in_room/await_full_state did not unblock" + )