From 7d8f0ef351e99adf602b3acb67b2516a02ff6918 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 4 Jun 2024 12:58:03 -0500 Subject: [PATCH] Use fully-qualified `PersistedEventPosition` when returning `RoomsForUser` (#17265) Use fully-qualified `PersistedEventPosition` (`instance_name` and `stream_ordering`) when returning `RoomsForUser` to facilitate proper comparisons and `RoomStreamToken` generation. Spawning from https://github.com/element-hq/synapse/pull/17187 where we want to utilize this change --- changelog.d/17265.misc | 1 + synapse/federation/federation_server.py | 4 +- synapse/handlers/admin.py | 10 +--- synapse/handlers/initial_sync.py | 2 +- synapse/handlers/pagination.py | 3 +- synapse/handlers/room.py | 60 +------------------- synapse/handlers/sync.py | 2 +- synapse/storage/databases/main/roommember.py | 14 ++++- synapse/storage/roommember.py | 2 +- synapse/types/__init__.py | 57 +++++++++++++++++++ tests/replication/storage/test_events.py | 5 +- 11 files changed, 85 insertions(+), 75 deletions(-) create mode 100644 changelog.d/17265.misc diff --git a/changelog.d/17265.misc b/changelog.d/17265.misc new file mode 100644 index 000000000..e6d4d8b4e --- /dev/null +++ b/changelog.d/17265.misc @@ -0,0 +1 @@ +Use fully-qualified `PersistedEventPosition` when returning `RoomsForUser` to facilitate proper comparisons and `RoomStreamToken` generation. diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 7ffc650aa..1932fa82a 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -674,7 +674,7 @@ class FederationServer(FederationBase): # This is in addition to the HS-level rate limiting applied by # BaseFederationServlet. # type-ignore: mypy doesn't seem able to deduce the type of the limiter(!?) - await self._room_member_handler._join_rate_per_room_limiter.ratelimit( # type: ignore[has-type] + await self._room_member_handler._join_rate_per_room_limiter.ratelimit( requester=None, key=room_id, update=False, @@ -717,7 +717,7 @@ class FederationServer(FederationBase): SynapseTags.SEND_JOIN_RESPONSE_IS_PARTIAL_STATE, caller_supports_partial_state, ) - await self._room_member_handler._join_rate_per_room_limiter.ratelimit( # type: ignore[has-type] + await self._room_member_handler._join_rate_per_room_limiter.ratelimit( requester=None, key=room_id, update=False, diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py index 702d40332..21d3bb37f 100644 --- a/synapse/handlers/admin.py +++ b/synapse/handlers/admin.py @@ -126,13 +126,7 @@ class AdminHandler: # Get all rooms the user is in or has been in rooms = await self._store.get_rooms_for_local_user_where_membership_is( user_id, - membership_list=( - Membership.JOIN, - Membership.LEAVE, - Membership.BAN, - Membership.INVITE, - Membership.KNOCK, - ), + membership_list=Membership.LIST, ) # We only try and fetch events for rooms the user has been in. If @@ -179,7 +173,7 @@ class AdminHandler: if room.membership == Membership.JOIN: stream_ordering = self._store.get_room_max_stream_ordering() else: - stream_ordering = room.stream_ordering + stream_ordering = room.event_pos.stream from_key = RoomStreamToken(topological=0, stream=0) to_key = RoomStreamToken(stream=stream_ordering) diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index d99fc4bec..84d6fecf3 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -199,7 +199,7 @@ class InitialSyncHandler: ) elif event.membership == Membership.LEAVE: room_end_token = RoomStreamToken( - stream=event.stream_ordering, + stream=event.event_pos.stream, ) deferred_room_state = run_in_background( self._state_storage_controller.get_state_for_events, diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 6617105cd..f7447b8ba 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -27,7 +27,6 @@ from synapse.api.constants import Direction, EventTypes, Membership from synapse.api.errors import SynapseError from synapse.api.filtering import Filter from synapse.events.utils import SerializeEventConfig -from synapse.handlers.room import ShutdownRoomParams, ShutdownRoomResponse from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME from synapse.logging.opentracing import trace from synapse.metrics.background_process_metrics import run_as_background_process @@ -38,6 +37,8 @@ from synapse.types import ( JsonMapping, Requester, ScheduledTask, + ShutdownRoomParams, + ShutdownRoomResponse, StreamKeyType, TaskStatus, ) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 51739a265..7f1b674d1 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -40,7 +40,6 @@ from typing import ( ) import attr -from typing_extensions import TypedDict import synapse.events.snapshot from synapse.api.constants import ( @@ -81,6 +80,8 @@ from synapse.types import ( RoomAlias, RoomID, RoomStreamToken, + ShutdownRoomParams, + ShutdownRoomResponse, StateMap, StrCollection, StreamKeyType, @@ -1780,63 +1781,6 @@ class RoomEventSource(EventSource[RoomStreamToken, EventBase]): return self.store.get_current_room_stream_token_for_room_id(room_id) -class ShutdownRoomParams(TypedDict): - """ - Attributes: - requester_user_id: - User who requested the action. Will be recorded as putting the room on the - blocking list. - new_room_user_id: - If set, a new room will be created with this user ID - as the creator and admin, and all users in the old room will be - moved into that room. If not set, no new room will be created - and the users will just be removed from the old room. - new_room_name: - A string representing the name of the room that new users will - be invited to. Defaults to `Content Violation Notification` - message: - A string containing the first message that will be sent as - `new_room_user_id` in the new room. Ideally this will clearly - convey why the original room was shut down. - Defaults to `Sharing illegal content on this server is not - permitted and rooms in violation will be blocked.` - block: - If set to `true`, this room will be added to a blocking list, - preventing future attempts to join the room. Defaults to `false`. - purge: - If set to `true`, purge the given room from the database. - force_purge: - If set to `true`, the room will be purged from database - even if there are still users joined to the room. - """ - - requester_user_id: Optional[str] - new_room_user_id: Optional[str] - new_room_name: Optional[str] - message: Optional[str] - block: bool - purge: bool - force_purge: bool - - -class ShutdownRoomResponse(TypedDict): - """ - Attributes: - kicked_users: An array of users (`user_id`) that were kicked. - failed_to_kick_users: - An array of users (`user_id`) that that were not kicked. - local_aliases: - An array of strings representing the local aliases that were - migrated from the old room to the new. - new_room_id: A string representing the room ID of the new room. - """ - - kicked_users: List[str] - failed_to_kick_users: List[str] - local_aliases: List[str] - new_room_id: Optional[str] - - class RoomShutdownHandler: DEFAULT_MESSAGE = ( "Sharing illegal content on this server is not permitted and rooms in" diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 1d7d9dfdd..e815e0ea7 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -2805,7 +2805,7 @@ class SyncHandler: continue leave_token = now_token.copy_and_replace( - StreamKeyType.ROOM, RoomStreamToken(stream=event.stream_ordering) + StreamKeyType.ROOM, RoomStreamToken(stream=event.event_pos.stream) ) room_entries.append( RoomSyncResultBuilder( diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 9fddbb2ca..d8b54dc4e 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -476,7 +476,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): ) sql = """ - SELECT room_id, e.sender, c.membership, event_id, e.stream_ordering, r.room_version + SELECT room_id, e.sender, c.membership, event_id, e.instance_name, e.stream_ordering, r.room_version FROM local_current_membership AS c INNER JOIN events AS e USING (room_id, event_id) INNER JOIN rooms AS r USING (room_id) @@ -488,7 +488,17 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): ) txn.execute(sql, (user_id, *args)) - results = [RoomsForUser(*r) for r in txn] + results = [ + RoomsForUser( + room_id=room_id, + sender=sender, + membership=membership, + event_id=event_id, + event_pos=PersistedEventPosition(instance_name, stream_ordering), + room_version_id=room_version, + ) + for room_id, sender, membership, event_id, instance_name, stream_ordering, room_version in txn + ] return results diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 7471f81a1..80c963086 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -35,7 +35,7 @@ class RoomsForUser: sender: str membership: str event_id: str - stream_ordering: int + event_pos: PersistedEventPosition room_version_id: str diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py index 151658df5..3a89787ca 100644 --- a/synapse/types/__init__.py +++ b/synapse/types/__init__.py @@ -1279,3 +1279,60 @@ class ScheduledTask: result: Optional[JsonMapping] # Optional error that should be assigned a value when the status is FAILED error: Optional[str] + + +class ShutdownRoomParams(TypedDict): + """ + Attributes: + requester_user_id: + User who requested the action. Will be recorded as putting the room on the + blocking list. + new_room_user_id: + If set, a new room will be created with this user ID + as the creator and admin, and all users in the old room will be + moved into that room. If not set, no new room will be created + and the users will just be removed from the old room. + new_room_name: + A string representing the name of the room that new users will + be invited to. Defaults to `Content Violation Notification` + message: + A string containing the first message that will be sent as + `new_room_user_id` in the new room. Ideally this will clearly + convey why the original room was shut down. + Defaults to `Sharing illegal content on this server is not + permitted and rooms in violation will be blocked.` + block: + If set to `true`, this room will be added to a blocking list, + preventing future attempts to join the room. Defaults to `false`. + purge: + If set to `true`, purge the given room from the database. + force_purge: + If set to `true`, the room will be purged from database + even if there are still users joined to the room. + """ + + requester_user_id: Optional[str] + new_room_user_id: Optional[str] + new_room_name: Optional[str] + message: Optional[str] + block: bool + purge: bool + force_purge: bool + + +class ShutdownRoomResponse(TypedDict): + """ + Attributes: + kicked_users: An array of users (`user_id`) that were kicked. + failed_to_kick_users: + An array of users (`user_id`) that that were not kicked. + local_aliases: + An array of strings representing the local aliases that were + migrated from the old room to the new. + new_room_id: A string representing the room ID of the new room. + """ + + kicked_users: List[str] + failed_to_kick_users: List[str] + local_aliases: List[str] + new_room_id: Optional[str] diff --git a/tests/replication/storage/test_events.py b/tests/replication/storage/test_events.py index 86c8f14d1..4e41a1c91 100644 --- a/tests/replication/storage/test_events.py +++ b/tests/replication/storage/test_events.py @@ -154,7 +154,10 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase): USER_ID, "invite", event.event_id, - event.internal_metadata.stream_ordering, + PersistedEventPosition( + self.hs.get_instance_name(), + event.internal_metadata.stream_ordering, + ), RoomVersions.V1.identifier, ) ],