import logging from typing import List, Tuple from unittest.mock import Mock, patch from twisted.test.proto_helpers import MemoryReactor from synapse.api.constants import EventContentFields, EventTypes from synapse.appservice import ApplicationService from synapse.rest import admin from synapse.rest.client import login, register, room, room_batch, sync from synapse.server import HomeServer from synapse.types import JsonDict, RoomStreamToken from synapse.util import Clock from tests import unittest logger = logging.getLogger(__name__) def _create_join_state_events_for_batch_send_request( virtual_user_ids: List[str], insert_time: int, ) -> List[JsonDict]: return [ { "type": EventTypes.Member, "sender": virtual_user_id, "origin_server_ts": insert_time, "content": { "membership": "join", "displayname": "display-name-for-%s" % (virtual_user_id,), }, "state_key": virtual_user_id, } for virtual_user_id in virtual_user_ids ] def _create_message_events_for_batch_send_request( virtual_user_id: str, insert_time: int, count: int ) -> List[JsonDict]: return [ { "type": EventTypes.Message, "sender": virtual_user_id, "origin_server_ts": insert_time, "content": { "msgtype": "m.text", "body": "Historical %d" % (i), EventContentFields.MSC2716_HISTORICAL: True, }, } for i in range(count) ] class RoomBatchTestCase(unittest.HomeserverTestCase): """Test importing batches of historical messages.""" servlets = [ admin.register_servlets_for_client_rest_resource, room_batch.register_servlets, room.register_servlets, register.register_servlets, login.register_servlets, sync.register_servlets, ] def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: config = self.default_config() self.appservice = ApplicationService( token="i_am_an_app_service", id="1234", namespaces={"users": [{"regex": r"@as_user.*", "exclusive": True}]}, # Note: this user does not have to match the regex above sender="@as_main:test", ) mock_load_appservices = Mock(return_value=[self.appservice]) with patch( "synapse.storage.databases.main.appservice.load_appservices", mock_load_appservices, ): hs = self.setup_test_homeserver(config=config) return hs def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.clock = clock self._storage_controllers = hs.get_storage_controllers() self.virtual_user_id, _ = self.register_appservice_user( "as_user_potato", self.appservice.token ) def _create_test_room(self) -> Tuple[str, str, str, str]: room_id = self.helper.create_room_as( self.appservice.sender, tok=self.appservice.token ) res_a = self.helper.send_event( room_id=room_id, type=EventTypes.Message, content={ "msgtype": "m.text", "body": "A", }, tok=self.appservice.token, ) event_id_a = res_a["event_id"] res_b = self.helper.send_event( room_id=room_id, type=EventTypes.Message, content={ "msgtype": "m.text", "body": "B", }, tok=self.appservice.token, ) event_id_b = res_b["event_id"] res_c = self.helper.send_event( room_id=room_id, type=EventTypes.Message, content={ "msgtype": "m.text", "body": "C", }, tok=self.appservice.token, ) event_id_c = res_c["event_id"] return room_id, event_id_a, event_id_b, event_id_c @unittest.override_config({"experimental_features": {"msc2716_enabled": True}}) def test_same_state_groups_for_whole_historical_batch(self) -> None: """Make sure that when using the `/batch_send` endpoint to import a bunch of historical messages, it re-uses the same `state_group` across the whole batch. This is an easy optimization to make sure we're getting right because the state for the whole batch is contained in `state_events_at_start` and can be shared across everything. """ time_before_room = int(self.clock.time_msec()) room_id, event_id_a, _, _ = self._create_test_room() channel = self.make_request( "POST", "/_matrix/client/unstable/org.matrix.msc2716/rooms/%s/batch_send?prev_event_id=%s" % (room_id, event_id_a), content={ "events": _create_message_events_for_batch_send_request( self.virtual_user_id, time_before_room, 3 ), "state_events_at_start": _create_join_state_events_for_batch_send_request( [self.virtual_user_id], time_before_room ), }, access_token=self.appservice.token, ) self.assertEqual(channel.code, 200, channel.result) # Get the historical event IDs that we just imported historical_event_ids = channel.json_body["event_ids"] self.assertEqual(len(historical_event_ids), 3) # Fetch the state_groups state_group_map = self.get_success( self._storage_controllers.state.get_state_groups_ids( room_id, historical_event_ids ) ) # We expect all of the historical events to be using the same state_group # so there should only be a single state_group here! self.assertEqual( len(state_group_map.keys()), 1, "Expected a single state_group to be returned by saw state_groups=%s" % (state_group_map.keys(),), ) @unittest.override_config({"experimental_features": {"msc2716_enabled": True}}) def test_sync_while_batch_importing(self) -> None: """ Make sure that /sync correctly returns full room state when a user joins during ongoing batch backfilling. See: https://github.com/matrix-org/synapse/issues/12281 """ # Create user who will be invited & join room user_id = self.register_user("beep", "test") user_tok = self.login("beep", "test") time_before_room = int(self.clock.time_msec()) # Create a room with some events room_id, _, _, _ = self._create_test_room() # Invite the user self.helper.invite( room_id, src=self.appservice.sender, tok=self.appservice.token, targ=user_id ) # Create another room, send a bunch of events to advance the stream token other_room_id = self.helper.create_room_as( self.appservice.sender, tok=self.appservice.token ) for _ in range(5): self.helper.send_event( room_id=other_room_id, type=EventTypes.Message, content={"msgtype": "m.text", "body": "C"}, tok=self.appservice.token, ) # Join the room as the normal user self.helper.join(room_id, user_id, tok=user_tok) # Create an event to hang the historical batch from - In order to see # the failure case originally reported in #12281, the historical batch # must be hung from the most recent event in the room so the base # insertion event ends up with the highest `topogological_ordering` # (`depth`) in the room but will have a negative `stream_ordering` # because it's a `historical` event. Previously, when assembling the # `state` for the `/sync` response, the bugged logic would sort by # `topological_ordering` descending and pick up the base insertion # event because it has a negative `stream_ordering` below the given # pagination token. Now we properly sort by `stream_ordering` # descending which puts `historical` events with a negative # `stream_ordering` way at the bottom and aren't selected as expected. response = self.helper.send_event( room_id=room_id, type=EventTypes.Message, content={ "msgtype": "m.text", "body": "C", }, tok=self.appservice.token, ) event_to_hang_id = response["event_id"] channel = self.make_request( "POST", "/_matrix/client/unstable/org.matrix.msc2716/rooms/%s/batch_send?prev_event_id=%s" % (room_id, event_to_hang_id), content={ "events": _create_message_events_for_batch_send_request( self.virtual_user_id, time_before_room, 3 ), "state_events_at_start": _create_join_state_events_for_batch_send_request( [self.virtual_user_id], time_before_room ), }, access_token=self.appservice.token, ) self.assertEqual(channel.code, 200, channel.result) # Now we need to find the invite + join events stream tokens so we can sync between main_store = self.hs.get_datastores().main events, next_key = self.get_success( main_store.get_recent_events_for_room( room_id, 50, end_token=main_store.get_room_max_token(), ), ) invite_event_position = None for event in events: if ( event.type == "m.room.member" and event.content["membership"] == "invite" ): invite_event_position = self.get_success( main_store.get_topological_token_for_event(event.event_id) ) break assert invite_event_position is not None, "No invite event found" # Remove the topological order from the token by re-creating w/stream only invite_event_position = RoomStreamToken(None, invite_event_position.stream) # Sync everything after this token since_token = self.get_success(invite_event_position.to_string(main_store)) sync_response = self.make_request( "GET", f"/sync?since={since_token}", access_token=user_tok, ) # Assert that, for this room, the user was considered to have joined and thus # receives the full state history state_event_types = [ event["type"] for event in sync_response.json_body["rooms"]["join"][room_id]["state"][ "events" ] ] assert ( "m.room.create" in state_event_types ), "Missing room full state in sync response"