Add Admin API to Fetch Messages Within a Particular Window (#13672)

This adds two new admin APIs that allow us to fetch messages from a room within a particular time.
This commit is contained in:
Connor Davis 2022-09-07 05:54:44 -04:00 committed by GitHub
parent 26bc26586b
commit bb5b47b62a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 435 additions and 14 deletions

View File

@ -0,0 +1 @@
Add admin APIs to fetch messages within a particular window of time.

View File

@ -393,6 +393,151 @@ A response body like the following is returned:
} }
``` ```
# Room Messages API
The Room Messages admin API allows server admins to get all messages
sent to a room in a given timeframe. There are various parameters available
that allow for filtering and ordering the returned list. This API supports pagination.
To use it, you will need to authenticate by providing an `access_token`
for a server admin: see [Admin API](../usage/administration/admin_api).
This endpoint mirrors the [Matrix Spec defined Messages API](https://spec.matrix.org/v1.1/client-server-api/#get_matrixclientv3roomsroomidmessages).
The API is:
```
GET /_synapse/admin/v1/rooms/<room_id>/messages
```
**Parameters**
The following path parameters are required:
* `room_id` - The ID of the room you wish you fetch messages from.
The following query parameters are available:
* `from` (required) - The token to start returning events from. This token can be obtained from a prev_batch
or next_batch token returned by the /sync endpoint, or from an end token returned by a previous request to this endpoint.
* `to` - The token to spot returning events at.
* `limit` - The maximum number of events to return. Defaults to `10`.
* `filter` - A JSON RoomEventFilter to filter returned events with.
* `dir` - The direction to return events from. Either `f` for forwards or `b` for backwards. Setting
this value to `b` will reverse the above sort order. Defaults to `f`.
**Response**
The following fields are possible in the JSON response body:
* `chunk` - A list of room events. The order depends on the dir parameter.
Note that an empty chunk does not necessarily imply that no more events are available. Clients should continue to paginate until no end property is returned.
* `end` - A token corresponding to the end of chunk. This token can be passed back to this endpoint to request further events.
If no further events are available, this property is omitted from the response.
* `start` - A token corresponding to the start of chunk.
* `state` - A list of state events relevant to showing the chunk.
**Example**
For more details on each chunk, read [the Matrix specification](https://spec.matrix.org/v1.1/client-server-api/#get_matrixclientv3roomsroomidmessages).
```json
{
"chunk": [
{
"content": {
"body": "This is an example text message",
"format": "org.matrix.custom.html",
"formatted_body": "<b>This is an example text message</b>",
"msgtype": "m.text"
},
"event_id": "$143273582443PhrSn:example.org",
"origin_server_ts": 1432735824653,
"room_id": "!636q39766251:example.com",
"sender": "@example:example.org",
"type": "m.room.message",
"unsigned": {
"age": 1234
}
},
{
"content": {
"name": "The room name"
},
"event_id": "$143273582443PhrSn:example.org",
"origin_server_ts": 1432735824653,
"room_id": "!636q39766251:example.com",
"sender": "@example:example.org",
"state_key": "",
"type": "m.room.name",
"unsigned": {
"age": 1234
}
},
{
"content": {
"body": "Gangnam Style",
"info": {
"duration": 2140786,
"h": 320,
"mimetype": "video/mp4",
"size": 1563685,
"thumbnail_info": {
"h": 300,
"mimetype": "image/jpeg",
"size": 46144,
"w": 300
},
"thumbnail_url": "mxc://example.org/FHyPlCeYUSFFxlgbQYZmoEoe",
"w": 480
},
"msgtype": "m.video",
"url": "mxc://example.org/a526eYUSFFxlgbQYZmo442"
},
"event_id": "$143273582443PhrSn:example.org",
"origin_server_ts": 1432735824653,
"room_id": "!636q39766251:example.com",
"sender": "@example:example.org",
"type": "m.room.message",
"unsigned": {
"age": 1234
}
}
],
"end": "t47409-4357353_219380_26003_2265",
"start": "t47429-4392820_219380_26003_2265"
}
```
# Room Timestamp to Event API
The Room Timestamp to Event API endpoint fetches the `event_id` of the closest event to the given
timestamp (`ts` query parameter) in the given direction (`dir` query parameter).
Useful for cases like jump to date so you can start paginating messages from
a given date in the archive.
The API is:
```
GET /_synapse/admin/v1/rooms/<room_id>/timestamp_to_event
```
**Parameters**
The following path parameters are required:
* `room_id` - The ID of the room you wish to check.
The following query parameters are available:
* `ts` - a timestamp in milliseconds where we will find the closest event in
the given direction.
* `dir` - can be `f` or `b` to indicate forwards and backwards in time from the
given timestamp. Defaults to `f`.
**Response**
* `event_id` - converted from timestamp
# Block Room API # Block Room API
The Block Room admin API allows server admins to block and unblock rooms, The Block Room admin API allows server admins to block and unblock rooms,
and query to see if a given room is blocked. and query to see if a given room is blocked.

View File

@ -26,6 +26,7 @@ from synapse.events.utils import SerializeEventConfig
from synapse.handlers.room import ShutdownRoomResponse from synapse.handlers.room import ShutdownRoomResponse
from synapse.logging.opentracing import trace from synapse.logging.opentracing import trace
from synapse.metrics.background_process_metrics import run_as_background_process from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.rest.admin._base import assert_user_is_admin
from synapse.storage.state import StateFilter from synapse.storage.state import StateFilter
from synapse.streams.config import PaginationConfig from synapse.streams.config import PaginationConfig
from synapse.types import JsonDict, Requester, StreamKeyType from synapse.types import JsonDict, Requester, StreamKeyType
@ -423,6 +424,7 @@ class PaginationHandler:
pagin_config: PaginationConfig, pagin_config: PaginationConfig,
as_client_event: bool = True, as_client_event: bool = True,
event_filter: Optional[Filter] = None, event_filter: Optional[Filter] = None,
use_admin_priviledge: bool = False,
) -> JsonDict: ) -> JsonDict:
"""Get messages in a room. """Get messages in a room.
@ -432,10 +434,16 @@ class PaginationHandler:
pagin_config: The pagination config rules to apply, if any. pagin_config: The pagination config rules to apply, if any.
as_client_event: True to get events in client-server format. as_client_event: True to get events in client-server format.
event_filter: Filter to apply to results or None event_filter: Filter to apply to results or None
use_admin_priviledge: if `True`, return all events, regardless
of whether `user` has access to them. To be used **ONLY**
from the admin API.
Returns: Returns:
Pagination API results Pagination API results
""" """
if use_admin_priviledge:
await assert_user_is_admin(self.auth, requester)
user_id = requester.user.to_string() user_id = requester.user.to_string()
if pagin_config.from_token: if pagin_config.from_token:
@ -458,12 +466,14 @@ class PaginationHandler:
room_token = from_token.room_key room_token = from_token.room_key
async with self.pagination_lock.read(room_id): async with self.pagination_lock.read(room_id):
( (membership, member_event_id) = (None, None)
membership, if not use_admin_priviledge:
member_event_id, (
) = await self.auth.check_user_in_room_or_world_readable( membership,
room_id, requester, allow_departed_users=True member_event_id,
) ) = await self.auth.check_user_in_room_or_world_readable(
room_id, requester, allow_departed_users=True
)
if pagin_config.direction == "b": if pagin_config.direction == "b":
# if we're going backwards, we might need to backfill. This # if we're going backwards, we might need to backfill. This
@ -475,7 +485,7 @@ class PaginationHandler:
room_id, room_token.stream room_id, room_token.stream
) )
if membership == Membership.LEAVE: if not use_admin_priviledge and membership == Membership.LEAVE:
# If they have left the room then clamp the token to be before # If they have left the room then clamp the token to be before
# they left the room, to save the effort of loading from the # they left the room, to save the effort of loading from the
# database. # database.
@ -528,12 +538,13 @@ class PaginationHandler:
if event_filter: if event_filter:
events = await event_filter.filter(events) events = await event_filter.filter(events)
events = await filter_events_for_client( if not use_admin_priviledge:
self._storage_controllers, events = await filter_events_for_client(
user_id, self._storage_controllers,
events, user_id,
is_peeking=(member_event_id is None), events,
) is_peeking=(member_event_id is None),
)
# if after the filter applied there are no more events # if after the filter applied there are no more events
# return immediately - but there might be more in next_token batch # return immediately - but there might be more in next_token batch

View File

@ -61,9 +61,11 @@ from synapse.rest.admin.rooms import (
MakeRoomAdminRestServlet, MakeRoomAdminRestServlet,
RoomEventContextServlet, RoomEventContextServlet,
RoomMembersRestServlet, RoomMembersRestServlet,
RoomMessagesRestServlet,
RoomRestServlet, RoomRestServlet,
RoomRestV2Servlet, RoomRestV2Servlet,
RoomStateRestServlet, RoomStateRestServlet,
RoomTimestampToEventRestServlet,
) )
from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet
from synapse.rest.admin.statistics import UserMediaStatisticsRestServlet from synapse.rest.admin.statistics import UserMediaStatisticsRestServlet
@ -271,6 +273,8 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
DestinationResetConnectionRestServlet(hs).register(http_server) DestinationResetConnectionRestServlet(hs).register(http_server)
DestinationRestServlet(hs).register(http_server) DestinationRestServlet(hs).register(http_server)
ListDestinationsRestServlet(hs).register(http_server) ListDestinationsRestServlet(hs).register(http_server)
RoomMessagesRestServlet(hs).register(http_server)
RoomTimestampToEventRestServlet(hs).register(http_server)
# Some servlets only get registered for the main process. # Some servlets only get registered for the main process.
if hs.config.worker.worker_app is None: if hs.config.worker.worker_app is None:

View File

@ -35,6 +35,7 @@ from synapse.rest.admin._base import (
) )
from synapse.storage.databases.main.room import RoomSortOrder from synapse.storage.databases.main.room import RoomSortOrder
from synapse.storage.state import StateFilter from synapse.storage.state import StateFilter
from synapse.streams.config import PaginationConfig
from synapse.types import JsonDict, RoomID, UserID, create_requester from synapse.types import JsonDict, RoomID, UserID, create_requester
from synapse.util import json_decoder from synapse.util import json_decoder
@ -858,3 +859,106 @@ class BlockRoomRestServlet(RestServlet):
await self._store.unblock_room(room_id) await self._store.unblock_room(room_id)
return HTTPStatus.OK, {"block": block} return HTTPStatus.OK, {"block": block}
class RoomMessagesRestServlet(RestServlet):
"""
Get messages list of a room.
"""
PATTERNS = admin_patterns("/rooms/(?P<room_id>[^/]*)/messages$")
def __init__(self, hs: "HomeServer"):
self._hs = hs
self._clock = hs.get_clock()
self._pagination_handler = hs.get_pagination_handler()
self._auth = hs.get_auth()
self._store = hs.get_datastores().main
async def on_GET(
self, request: SynapseRequest, room_id: str
) -> Tuple[int, JsonDict]:
requester = await self._auth.get_user_by_req(request)
await assert_user_is_admin(self._auth, requester)
pagination_config = await PaginationConfig.from_request(
self._store, request, default_limit=10
)
# Twisted will have processed the args by now.
assert request.args is not None
as_client_event = b"raw" not in request.args
filter_str = parse_string(request, "filter", encoding="utf-8")
if filter_str:
filter_json = urlparse.unquote(filter_str)
event_filter: Optional[Filter] = Filter(
self._hs, json_decoder.decode(filter_json)
)
if (
event_filter
and event_filter.filter_json.get("event_format", "client")
== "federation"
):
as_client_event = False
else:
event_filter = None
msgs = await self._pagination_handler.get_messages(
room_id=room_id,
requester=requester,
pagin_config=pagination_config,
as_client_event=as_client_event,
event_filter=event_filter,
use_admin_priviledge=True,
)
return HTTPStatus.OK, msgs
class RoomTimestampToEventRestServlet(RestServlet):
"""
API endpoint to fetch the `event_id` of the closest event to the given
timestamp (`ts` query parameter) in the given direction (`dir` query
parameter).
Useful for cases like jump to date so you can start paginating messages from
a given date in the archive.
`ts` is a timestamp in milliseconds where we will find the closest event in
the given direction.
`dir` can be `f` or `b` to indicate forwards and backwards in time from the
given timestamp.
GET /_synapse/admin/v1/rooms/<roomID>/timestamp_to_event?ts=<timestamp>&dir=<direction>
{
"event_id": ...
}
"""
PATTERNS = admin_patterns("/rooms/(?P<room_id>[^/]*)/timestamp_to_event$")
def __init__(self, hs: "HomeServer"):
self._auth = hs.get_auth()
self._store = hs.get_datastores().main
self._timestamp_lookup_handler = hs.get_timestamp_lookup_handler()
async def on_GET(
self, request: SynapseRequest, room_id: str
) -> Tuple[int, JsonDict]:
requester = await self._auth.get_user_by_req(request)
await assert_user_is_admin(self._auth, requester)
timestamp = parse_integer(request, "ts", required=True)
direction = parse_string(request, "dir", default="f", allowed_values=["f", "b"])
(
event_id,
origin_server_ts,
) = await self._timestamp_lookup_handler.get_event_for_timestamp(
requester, room_id, timestamp, direction
)
return HTTPStatus.OK, {
"event_id": event_id,
"origin_server_ts": origin_server_ts,
}

View File

@ -11,6 +11,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import json
import time
import urllib.parse import urllib.parse
from typing import List, Optional from typing import List, Optional
from unittest.mock import Mock from unittest.mock import Mock
@ -22,10 +24,11 @@ from twisted.test.proto_helpers import MemoryReactor
import synapse.rest.admin import synapse.rest.admin
from synapse.api.constants import EventTypes, Membership, RoomTypes from synapse.api.constants import EventTypes, Membership, RoomTypes
from synapse.api.errors import Codes from synapse.api.errors import Codes
from synapse.handlers.pagination import PaginationHandler from synapse.handlers.pagination import PaginationHandler, PurgeStatus
from synapse.rest.client import directory, events, login, room from synapse.rest.client import directory, events, login, room
from synapse.server import HomeServer from synapse.server import HomeServer
from synapse.util import Clock from synapse.util import Clock
from synapse.util.stringutils import random_string
from tests import unittest from tests import unittest
@ -1793,6 +1796,159 @@ class RoomTestCase(unittest.HomeserverTestCase):
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"]) self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
class RoomMessagesTestCase(unittest.HomeserverTestCase):
servlets = [
synapse.rest.admin.register_servlets,
login.register_servlets,
room.register_servlets,
]
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.admin_user = self.register_user("admin", "pass", admin=True)
self.admin_user_tok = self.login("admin", "pass")
self.user = self.register_user("foo", "pass")
self.user_tok = self.login("foo", "pass")
self.room_id = self.helper.create_room_as(self.user, tok=self.user_tok)
def test_timestamp_to_event(self) -> None:
"""Test that providing the current timestamp can get the last event."""
self.helper.send(self.room_id, body="message 1", tok=self.user_tok)
second_event_id = self.helper.send(
self.room_id, body="message 2", tok=self.user_tok
)["event_id"]
ts = str(round(time.time() * 1000))
channel = self.make_request(
"GET",
"/_synapse/admin/v1/rooms/%s/timestamp_to_event?dir=b&ts=%s"
% (self.room_id, ts),
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code)
self.assertIn("event_id", channel.json_body)
self.assertEqual(second_event_id, channel.json_body["event_id"])
def test_topo_token_is_accepted(self) -> None:
"""Test Topo Token is accepted."""
token = "t1-0_0_0_0_0_0_0_0_0"
channel = self.make_request(
"GET",
"/_synapse/admin/v1/rooms/%s/messages?from=%s" % (self.room_id, token),
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code)
self.assertIn("start", channel.json_body)
self.assertEqual(token, channel.json_body["start"])
self.assertIn("chunk", channel.json_body)
self.assertIn("end", channel.json_body)
def test_stream_token_is_accepted_for_fwd_pagianation(self) -> None:
"""Test that stream token is accepted for forward pagination."""
token = "s0_0_0_0_0_0_0_0_0"
channel = self.make_request(
"GET",
"/_synapse/admin/v1/rooms/%s/messages?from=%s" % (self.room_id, token),
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code)
self.assertIn("start", channel.json_body)
self.assertEqual(token, channel.json_body["start"])
self.assertIn("chunk", channel.json_body)
self.assertIn("end", channel.json_body)
def test_room_messages_purge(self) -> None:
"""Test room messages can be retrieved by an admin that isn't in the room."""
store = self.hs.get_datastores().main
pagination_handler = self.hs.get_pagination_handler()
# Send a first message in the room, which will be removed by the purge.
first_event_id = self.helper.send(
self.room_id, body="message 1", tok=self.user_tok
)["event_id"]
first_token = self.get_success(
store.get_topological_token_for_event(first_event_id)
)
first_token_str = self.get_success(first_token.to_string(store))
# Send a second message in the room, which won't be removed, and which we'll
# use as the marker to purge events before.
second_event_id = self.helper.send(
self.room_id, body="message 2", tok=self.user_tok
)["event_id"]
second_token = self.get_success(
store.get_topological_token_for_event(second_event_id)
)
second_token_str = self.get_success(second_token.to_string(store))
# Send a third event in the room to ensure we don't fall under any edge case
# due to our marker being the latest forward extremity in the room.
self.helper.send(self.room_id, body="message 3", tok=self.user_tok)
# Check that we get the first and second message when querying /messages.
channel = self.make_request(
"GET",
"/_synapse/admin/v1/rooms/%s/messages?from=%s&dir=b&filter=%s"
% (
self.room_id,
second_token_str,
json.dumps({"types": [EventTypes.Message]}),
),
access_token=self.admin_user_tok,
)
self.assertEqual(channel.code, 200, channel.json_body)
chunk = channel.json_body["chunk"]
self.assertEqual(len(chunk), 2, [event["content"] for event in chunk])
# Purge every event before the second event.
purge_id = random_string(16)
pagination_handler._purges_by_id[purge_id] = PurgeStatus()
self.get_success(
pagination_handler._purge_history(
purge_id=purge_id,
room_id=self.room_id,
token=second_token_str,
delete_local_events=True,
)
)
# Check that we only get the second message through /message now that the first
# has been purged.
channel = self.make_request(
"GET",
"/_synapse/admin/v1/rooms/%s/messages?from=%s&dir=b&filter=%s"
% (
self.room_id,
second_token_str,
json.dumps({"types": [EventTypes.Message]}),
),
access_token=self.admin_user_tok,
)
self.assertEqual(channel.code, 200, channel.json_body)
chunk = channel.json_body["chunk"]
self.assertEqual(len(chunk), 1, [event["content"] for event in chunk])
# Check that we get no event, but also no error, when querying /messages with
# the token that was pointing at the first event, because we don't have it
# anymore.
channel = self.make_request(
"GET",
"/_synapse/admin/v1/rooms/%s/messages?from=%s&dir=b&filter=%s"
% (
self.room_id,
first_token_str,
json.dumps({"types": [EventTypes.Message]}),
),
access_token=self.admin_user_tok,
)
self.assertEqual(channel.code, 200, channel.json_body)
chunk = channel.json_body["chunk"]
self.assertEqual(len(chunk), 0, [event["content"] for event in chunk])
class JoinAliasRoomTestCase(unittest.HomeserverTestCase): class JoinAliasRoomTestCase(unittest.HomeserverTestCase):
servlets = [ servlets = [