From 6a72c910f180ee8b4bd78223775af48492769472 Mon Sep 17 00:00:00 2001 From: Dirk Klimpel <5740567+dklimpel@users.noreply.github.com> Date: Tue, 25 Jan 2022 17:11:40 +0100 Subject: [PATCH] Add admin API to get a list of federated rooms (#11658) --- changelog.d/11658.feature | 1 + .../administration/admin_api/federation.md | 60 ++++ synapse/rest/admin/__init__.py | 2 + synapse/rest/admin/federation.py | 56 ++++ .../storage/databases/main/transactions.py | 48 +++ tests/rest/admin/test_federation.py | 302 ++++++++++++++++-- 6 files changed, 444 insertions(+), 25 deletions(-) create mode 100644 changelog.d/11658.feature diff --git a/changelog.d/11658.feature b/changelog.d/11658.feature new file mode 100644 index 000000000..2ec9fb5ee --- /dev/null +++ b/changelog.d/11658.feature @@ -0,0 +1 @@ +Add an admin API to get a list of rooms that federate with a given remote homeserver. \ No newline at end of file diff --git a/docs/usage/administration/admin_api/federation.md b/docs/usage/administration/admin_api/federation.md index 5e609561a..60cbc5265 100644 --- a/docs/usage/administration/admin_api/federation.md +++ b/docs/usage/administration/admin_api/federation.md @@ -119,6 +119,66 @@ The following parameters should be set in the URL: The response fields are the same like in the `destinations` array in [List of destinations](#list-of-destinations) response. +## Destination rooms + +This API gets the rooms that federate with a specific remote server. + +The API is: + +``` +GET /_synapse/admin/v1/federation/destinations//rooms +``` + +A response body like the following is returned: + +```json +{ + "rooms":[ + { + "room_id": "!OGEhHVWSdvArJzumhm:matrix.org", + "stream_ordering": 8326 + }, + { + "room_id": "!xYvNcQPhnkrdUmYczI:matrix.org", + "stream_ordering": 93534 + } + ], + "total": 2 +} +``` + +To paginate, check for `next_token` and if present, call the endpoint again +with `from` set to the value of `next_token`. This will return a new page. + +If the endpoint does not return a `next_token` then there are no more destinations +to paginate through. + +**Parameters** + +The following parameters should be set in the URL: + +- `destination` - Name of the remote server. + +The following query parameters are available: + +- `from` - Offset in the returned list. Defaults to `0`. +- `limit` - Maximum amount of destinations to return. Defaults to `100`. +- `dir` - Direction of room order by `room_id`. Either `f` for forwards or `b` for + backwards. Defaults to `f`. + +**Response** + +The following fields are returned in the JSON response body: + +- `rooms` - An array of objects, each containing information about a room. + Room objects contain the following fields: + - `room_id` - string - The ID of the room. + - `stream_ordering` - integer - The stream ordering of the most recent + successfully-sent [PDU](understanding_synapse_through_grafana_graphs.md#federation) + to this destination in this room. +- `next_token`: string representing a positive integer - Indication for pagination. See above. +- `total` - integer - Total number of destinations. + ## Reset connection timeout Synapse makes federation requests to other homeservers. If a federation request fails, diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py index b1e49d51b..9be9e33c8 100644 --- a/synapse/rest/admin/__init__.py +++ b/synapse/rest/admin/__init__.py @@ -41,6 +41,7 @@ from synapse.rest.admin.event_reports import ( EventReportsRestServlet, ) from synapse.rest.admin.federation import ( + DestinationMembershipRestServlet, DestinationResetConnectionRestServlet, DestinationRestServlet, ListDestinationsRestServlet, @@ -268,6 +269,7 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: ListRegistrationTokensRestServlet(hs).register(http_server) NewRegistrationTokenRestServlet(hs).register(http_server) RegistrationTokenRestServlet(hs).register(http_server) + DestinationMembershipRestServlet(hs).register(http_server) DestinationResetConnectionRestServlet(hs).register(http_server) DestinationRestServlet(hs).register(http_server) ListDestinationsRestServlet(hs).register(http_server) diff --git a/synapse/rest/admin/federation.py b/synapse/rest/admin/federation.py index 0f33f9e4d..d162e0081 100644 --- a/synapse/rest/admin/federation.py +++ b/synapse/rest/admin/federation.py @@ -148,6 +148,62 @@ class DestinationRestServlet(RestServlet): return HTTPStatus.OK, response +class DestinationMembershipRestServlet(RestServlet): + """Get list of rooms of a destination. + This needs user to have administrator access in Synapse. + + GET /_synapse/admin/v1/federation/destinations//rooms?from=0&limit=10 + + returns: + 200 OK with a list of rooms if success otherwise an error. + + The parameters `from` and `limit` are required only for pagination. + By default, a `limit` of 100 is used. + """ + + PATTERNS = admin_patterns("/federation/destinations/(?P[^/]*)/rooms$") + + def __init__(self, hs: "HomeServer"): + self._auth = hs.get_auth() + self._store = hs.get_datastore() + + async def on_GET( + self, request: SynapseRequest, destination: str + ) -> Tuple[int, JsonDict]: + await assert_requester_is_admin(self._auth, request) + + if not await self._store.is_destination_known(destination): + raise NotFoundError("Unknown destination") + + start = parse_integer(request, "from", default=0) + limit = parse_integer(request, "limit", default=100) + + if start < 0: + raise SynapseError( + HTTPStatus.BAD_REQUEST, + "Query parameter from must be a string representing a positive integer.", + errcode=Codes.INVALID_PARAM, + ) + + if limit < 0: + raise SynapseError( + HTTPStatus.BAD_REQUEST, + "Query parameter limit must be a string representing a positive integer.", + errcode=Codes.INVALID_PARAM, + ) + + direction = parse_string(request, "dir", default="f", allowed_values=("f", "b")) + + rooms, total = await self._store.get_destination_rooms_paginate( + destination, start, limit, direction + ) + response = {"rooms": rooms, "total": total} + if (start + limit) < total: + response["next_token"] = str(start + len(rooms)) + + return HTTPStatus.OK, response + + class DestinationResetConnectionRestServlet(RestServlet): """Reset destinations' connection timeouts and wake it up. This needs user to have administrator access in Synapse. diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index 4b78b4d09..ba79e19f7 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -561,6 +561,54 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore): "get_destinations_paginate_txn", get_destinations_paginate_txn ) + async def get_destination_rooms_paginate( + self, destination: str, start: int, limit: int, direction: str = "f" + ) -> Tuple[List[JsonDict], int]: + """Function to retrieve a paginated list of destination's rooms. + This will return a json list of rooms and the + total number of rooms. + + Args: + destination: the destination to query + start: start number to begin the query from + limit: number of rows to retrieve + direction: sort ascending or descending by room_id + Returns: + A tuple of a dict of rooms and a count of total rooms. + """ + + def get_destination_rooms_paginate_txn( + txn: LoggingTransaction, + ) -> Tuple[List[JsonDict], int]: + + if direction == "b": + order = "DESC" + else: + order = "ASC" + + sql = """ + SELECT COUNT(*) as total_rooms + FROM destination_rooms + WHERE destination = ? + """ + txn.execute(sql, [destination]) + count = cast(Tuple[int], txn.fetchone())[0] + + rooms = self.db_pool.simple_select_list_paginate_txn( + txn=txn, + table="destination_rooms", + orderby="room_id", + start=start, + limit=limit, + retcols=("room_id", "stream_ordering"), + order_direction=order, + ) + return rooms, count + + return await self.db_pool.runInteraction( + "get_destination_rooms_paginate_txn", get_destination_rooms_paginate_txn + ) + async def is_destination_known(self, destination: str) -> bool: """Check if a destination is known to the server.""" result = await self.db_pool.simple_select_one_onecol( diff --git a/tests/rest/admin/test_federation.py b/tests/rest/admin/test_federation.py index e2d3cff2a..71068d16c 100644 --- a/tests/rest/admin/test_federation.py +++ b/tests/rest/admin/test_federation.py @@ -20,7 +20,7 @@ from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin from synapse.api.errors import Codes -from synapse.rest.client import login +from synapse.rest.client import login, room from synapse.server import HomeServer from synapse.types import JsonDict from synapse.util import Clock @@ -52,9 +52,7 @@ class FederationTestCase(unittest.HomeserverTestCase): ] ) def test_requester_is_no_admin(self, method: str, url: str) -> None: - """ - If the user is not a server admin, an error 403 is returned. - """ + """If the user is not a server admin, an error 403 is returned.""" self.register_user("user", "pass", admin=False) other_user_tok = self.login("user", "pass") @@ -70,9 +68,7 @@ class FederationTestCase(unittest.HomeserverTestCase): self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"]) def test_invalid_parameter(self) -> None: - """ - If parameters are invalid, an error is returned. - """ + """If parameters are invalid, an error is returned.""" # negative limit channel = self.make_request( @@ -135,9 +131,7 @@ class FederationTestCase(unittest.HomeserverTestCase): self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"]) def test_limit(self) -> None: - """ - Testing list of destinations with limit - """ + """Testing list of destinations with limit""" number_destinations = 20 self._create_destinations(number_destinations) @@ -155,9 +149,7 @@ class FederationTestCase(unittest.HomeserverTestCase): self._check_fields(channel.json_body["destinations"]) def test_from(self) -> None: - """ - Testing list of destinations with a defined starting point (from) - """ + """Testing list of destinations with a defined starting point (from)""" number_destinations = 20 self._create_destinations(number_destinations) @@ -175,9 +167,7 @@ class FederationTestCase(unittest.HomeserverTestCase): self._check_fields(channel.json_body["destinations"]) def test_limit_and_from(self) -> None: - """ - Testing list of destinations with a defined starting point and limit - """ + """Testing list of destinations with a defined starting point and limit""" number_destinations = 20 self._create_destinations(number_destinations) @@ -195,9 +185,7 @@ class FederationTestCase(unittest.HomeserverTestCase): self._check_fields(channel.json_body["destinations"]) def test_next_token(self) -> None: - """ - Testing that `next_token` appears at the right place - """ + """Testing that `next_token` appears at the right place""" number_destinations = 20 self._create_destinations(number_destinations) @@ -256,9 +244,7 @@ class FederationTestCase(unittest.HomeserverTestCase): self.assertNotIn("next_token", channel.json_body) def test_list_all_destinations(self) -> None: - """ - List all destinations. - """ + """List all destinations.""" number_destinations = 5 self._create_destinations(number_destinations) @@ -277,9 +263,7 @@ class FederationTestCase(unittest.HomeserverTestCase): self._check_fields(channel.json_body["destinations"]) def test_order_by(self) -> None: - """ - Testing order list with parameter `order_by` - """ + """Testing order list with parameter `order_by`""" def _order_test( expected_destination_list: List[str], @@ -543,3 +527,271 @@ class FederationTestCase(unittest.HomeserverTestCase): self.assertIn("retry_interval", c) self.assertIn("failure_ts", c) self.assertIn("last_successful_stream_ordering", c) + + +class DestinationMembershipTestCase(unittest.HomeserverTestCase): + servlets = [ + synapse.rest.admin.register_servlets, + login.register_servlets, + room.register_servlets, + ] + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.store = hs.get_datastore() + self.admin_user = self.register_user("admin", "pass", admin=True) + self.admin_user_tok = self.login("admin", "pass") + + self.dest = "sub0.example.com" + self.url = f"/_synapse/admin/v1/federation/destinations/{self.dest}/rooms" + + # Record that we successfully contacted a destination in the DB. + self.get_success( + self.store.set_destination_retry_timings(self.dest, None, 0, 0) + ) + + def test_requester_is_no_admin(self) -> None: + """If the user is not a server admin, an error 403 is returned.""" + + self.register_user("user", "pass", admin=False) + other_user_tok = self.login("user", "pass") + + channel = self.make_request( + "GET", + self.url, + access_token=other_user_tok, + ) + + self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body) + self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"]) + + def test_invalid_parameter(self) -> None: + """If parameters are invalid, an error is returned.""" + + # negative limit + channel = self.make_request( + "GET", + self.url + "?limit=-5", + access_token=self.admin_user_tok, + ) + + self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body) + self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"]) + + # negative from + channel = self.make_request( + "GET", + self.url + "?from=-5", + access_token=self.admin_user_tok, + ) + + self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body) + self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"]) + + # invalid search order + channel = self.make_request( + "GET", + self.url + "?dir=bar", + access_token=self.admin_user_tok, + ) + + self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body) + self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"]) + + # invalid destination + channel = self.make_request( + "GET", + "/_synapse/admin/v1/federation/destinations/%s/rooms" % ("invalid",), + access_token=self.admin_user_tok, + ) + + self.assertEqual(HTTPStatus.NOT_FOUND, channel.code, msg=channel.json_body) + self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"]) + + def test_limit(self) -> None: + """Testing list of destinations with limit""" + + number_rooms = 5 + self._create_destination_rooms(number_rooms) + + channel = self.make_request( + "GET", + self.url + "?limit=3", + access_token=self.admin_user_tok, + ) + + self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) + self.assertEqual(channel.json_body["total"], number_rooms) + self.assertEqual(len(channel.json_body["rooms"]), 3) + self.assertEqual(channel.json_body["next_token"], "3") + self._check_fields(channel.json_body["rooms"]) + + def test_from(self) -> None: + """Testing list of rooms with a defined starting point (from)""" + + number_rooms = 10 + self._create_destination_rooms(number_rooms) + + channel = self.make_request( + "GET", + self.url + "?from=5", + access_token=self.admin_user_tok, + ) + + self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) + self.assertEqual(channel.json_body["total"], number_rooms) + self.assertEqual(len(channel.json_body["rooms"]), 5) + self.assertNotIn("next_token", channel.json_body) + self._check_fields(channel.json_body["rooms"]) + + def test_limit_and_from(self) -> None: + """Testing list of rooms with a defined starting point and limit""" + + number_rooms = 10 + self._create_destination_rooms(number_rooms) + + channel = self.make_request( + "GET", + self.url + "?from=3&limit=5", + access_token=self.admin_user_tok, + ) + + self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) + self.assertEqual(channel.json_body["total"], number_rooms) + self.assertEqual(channel.json_body["next_token"], "8") + self.assertEqual(len(channel.json_body["rooms"]), 5) + self._check_fields(channel.json_body["rooms"]) + + def test_order_direction(self) -> None: + """Testing order list with parameter `dir`""" + number_rooms = 4 + self._create_destination_rooms(number_rooms) + + # get list in forward direction + channel_asc = self.make_request( + "GET", + self.url + "?dir=f", + access_token=self.admin_user_tok, + ) + + self.assertEqual(HTTPStatus.OK, channel_asc.code, msg=channel_asc.json_body) + self.assertEqual(channel_asc.json_body["total"], number_rooms) + self.assertEqual(number_rooms, len(channel_asc.json_body["rooms"])) + self._check_fields(channel_asc.json_body["rooms"]) + + # get list in backward direction + channel_desc = self.make_request( + "GET", + self.url + "?dir=b", + access_token=self.admin_user_tok, + ) + + self.assertEqual(HTTPStatus.OK, channel_desc.code, msg=channel_desc.json_body) + self.assertEqual(channel_desc.json_body["total"], number_rooms) + self.assertEqual(number_rooms, len(channel_desc.json_body["rooms"])) + self._check_fields(channel_desc.json_body["rooms"]) + + # test that both lists have different directions + for i in range(0, number_rooms): + self.assertEqual( + channel_asc.json_body["rooms"][i]["room_id"], + channel_desc.json_body["rooms"][number_rooms - 1 - i]["room_id"], + ) + + def test_next_token(self) -> None: + """Testing that `next_token` appears at the right place""" + + number_rooms = 5 + self._create_destination_rooms(number_rooms) + + # `next_token` does not appear + # Number of results is the number of entries + channel = self.make_request( + "GET", + self.url + "?limit=5", + access_token=self.admin_user_tok, + ) + + self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) + self.assertEqual(channel.json_body["total"], number_rooms) + self.assertEqual(len(channel.json_body["rooms"]), number_rooms) + self.assertNotIn("next_token", channel.json_body) + + # `next_token` does not appear + # Number of max results is larger than the number of entries + channel = self.make_request( + "GET", + self.url + "?limit=6", + access_token=self.admin_user_tok, + ) + + self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) + self.assertEqual(channel.json_body["total"], number_rooms) + self.assertEqual(len(channel.json_body["rooms"]), number_rooms) + self.assertNotIn("next_token", channel.json_body) + + # `next_token` does appear + # Number of max results is smaller than the number of entries + channel = self.make_request( + "GET", + self.url + "?limit=4", + access_token=self.admin_user_tok, + ) + + self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) + self.assertEqual(channel.json_body["total"], number_rooms) + self.assertEqual(len(channel.json_body["rooms"]), 4) + self.assertEqual(channel.json_body["next_token"], "4") + + # Check + # Set `from` to value of `next_token` for request remaining entries + # `next_token` does not appear + channel = self.make_request( + "GET", + self.url + "?from=4", + access_token=self.admin_user_tok, + ) + + self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) + self.assertEqual(channel.json_body["total"], number_rooms) + self.assertEqual(len(channel.json_body["rooms"]), 1) + self.assertNotIn("next_token", channel.json_body) + + def test_destination_rooms(self) -> None: + """Testing that request the list of rooms is successfully.""" + number_rooms = 3 + self._create_destination_rooms(number_rooms) + + channel = self.make_request( + "GET", + self.url, + access_token=self.admin_user_tok, + ) + + self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) + self.assertEqual(channel.json_body["total"], number_rooms) + self.assertEqual(number_rooms, len(channel.json_body["rooms"])) + self._check_fields(channel.json_body["rooms"]) + + def _create_destination_rooms(self, number_rooms: int) -> None: + """Create a number rooms for destination + + Args: + number_rooms: Number of rooms to be created + """ + for _ in range(0, number_rooms): + room_id = self.helper.create_room_as( + self.admin_user, tok=self.admin_user_tok + ) + self.get_success( + self.store.store_destination_rooms_entries((self.dest,), room_id, 1234) + ) + + def _check_fields(self, content: List[JsonDict]) -> None: + """Checks that the expected room attributes are present in content + + Args: + content: List that is checked for content + """ + for c in content: + self.assertIn("room_id", c) + self.assertIn("stream_ordering", c)