Add an Admin API endpoint to redact all a user's events (#17506)

This commit is contained in:
Shay 2024-09-18 03:08:01 -07:00 committed by GitHub
parent 8881ad6d4b
commit 51dd4df0a3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 712 additions and 5 deletions

View File

@ -0,0 +1,2 @@
Add an asynchronous Admin API endpoint [to redact all a user's events](https://element-hq.github.io/synapse/v1.116/admin_api/user_admin_api.html#redact-all-the-events-of-a-user),
and [an endpoint to check on the status of that redaction task](https://element-hq.github.io/synapse/v1.116/admin_api/user_admin_api.html#check-the-status-of-a-redaction-process).

View File

@ -1361,3 +1361,83 @@ Returns a `404` HTTP status code if no user was found, with a response body like
```
_Added in Synapse 1.72.0._
## Redact all the events of a user
The API is
```
POST /_synapse/admin/v1/user/$user_id/redact
{
"rooms": ["!roomid1", "!roomid2"]
}
```
If an empty list is provided as the key for `rooms`, all events in all the rooms the user is member of will be redacted,
otherwise all the events in the rooms provided in the request will be redacted.
The API starts redaction process running, and returns immediately with a JSON body with
a redact id which can be used to query the status of the redaction process:
```json
{
"redact_id": "<opaque id>"
}
```
**Parameters**
The following parameters should be set in the URL:
- `user_id` - The fully qualified MXID of the user: for example, `@user:server.com`.
The following JSON body parameter must be provided:
- `rooms` - A list of rooms to redact the user's events in. If an empty list is provided all events in all rooms
the user is a member of will be redacted
_Added in Synapse 1.116.0._
The following JSON body parameters are optional:
- `reason` - Reason the redaction is being requested, ie "spam", "abuse", etc. This will be included in each redaction event, and be visible to users.
- `limit` - a limit on the number of the user's events to search for ones that can be redacted (events are redacted newest to oldest) in each room, defaults to 1000 if not provided
## Check the status of a redaction process
It is possible to query the status of the background task for redacting a user's events.
The status can be queried up to 24 hours after completion of the task,
or until Synapse is restarted (whichever happens first).
The API is:
```
GET /_synapse/admin/v1/user/redact_status/$redact_id
```
A response body like the following is returned:
```
{
"status": "active",
"failed_redactions": [],
}
```
**Parameters**
The following parameters should be set in the URL:
* `redact_id` - string - The ID for this redaction process, provided when the redaction was requested.
**Response**
The following fields are returned in the JSON response body:
- `status` - string - one of scheduled/active/completed/failed, indicating the status of the redaction job
- `failed_redactions` - dictionary - the keys of the dict are event ids the process was unable to redact, if any, and the values are
the corresponding error that caused the redaction to fail
_Added in Synapse 1.116.0._

View File

@ -21,13 +21,34 @@
import abc
import logging
from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Sequence, Set
from typing import (
TYPE_CHECKING,
Any,
Dict,
List,
Mapping,
Optional,
Sequence,
Set,
Tuple,
)
import attr
from synapse.api.constants import Direction, Membership
from synapse.api.constants import Direction, EventTypes, Membership
from synapse.api.errors import SynapseError
from synapse.events import EventBase
from synapse.types import JsonMapping, RoomStreamToken, StateMap, UserID, UserInfo
from synapse.types import (
JsonMapping,
Requester,
RoomStreamToken,
ScheduledTask,
StateMap,
TaskStatus,
UserID,
UserInfo,
create_requester,
)
from synapse.visibility import filter_events_for_client
if TYPE_CHECKING:
@ -35,6 +56,8 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
REDACT_ALL_EVENTS_ACTION_NAME = "redact_all_events"
class AdminHandler:
def __init__(self, hs: "HomeServer"):
@ -43,6 +66,20 @@ class AdminHandler:
self._storage_controllers = hs.get_storage_controllers()
self._state_storage_controller = self._storage_controllers.state
self._msc3866_enabled = hs.config.experimental.msc3866.enabled
self.event_creation_handler = hs.get_event_creation_handler()
self._task_scheduler = hs.get_task_scheduler()
self._task_scheduler.register_action(
self._redact_all_events, REDACT_ALL_EVENTS_ACTION_NAME
)
async def get_redact_task(self, redact_id: str) -> Optional[ScheduledTask]:
"""Get the current status of an active redaction process
Args:
redact_id: redact_id returned by start_redact_events.
"""
return await self._task_scheduler.get_task(redact_id)
async def get_whois(self, user: UserID) -> JsonMapping:
connections = []
@ -313,6 +350,153 @@ class AdminHandler:
return writer.finished()
async def start_redact_events(
self,
user_id: str,
rooms: list,
requester: JsonMapping,
reason: Optional[str],
limit: Optional[int],
) -> str:
"""
Start a task redacting the events of the given user in the given rooms
Args:
user_id: the user ID of the user whose events should be redacted
rooms: the rooms in which to redact the user's events
requester: the user requesting the events
reason: reason for requesting the redaction, ie spam, etc
limit: limit on the number of events in each room to redact
Returns:
a unique ID which can be used to query the status of the task
"""
active_tasks = await self._task_scheduler.get_tasks(
actions=[REDACT_ALL_EVENTS_ACTION_NAME],
resource_id=user_id,
statuses=[TaskStatus.ACTIVE],
)
if len(active_tasks) > 0:
raise SynapseError(
400, "Redact already in progress for user %s" % (user_id,)
)
if not limit:
limit = 1000
redact_id = await self._task_scheduler.schedule_task(
REDACT_ALL_EVENTS_ACTION_NAME,
resource_id=user_id,
params={
"rooms": rooms,
"requester": requester,
"user_id": user_id,
"reason": reason,
"limit": limit,
},
)
logger.info(
"starting redact events with redact_id %s",
redact_id,
)
return redact_id
async def _redact_all_events(
self, task: ScheduledTask
) -> Tuple[TaskStatus, Optional[Mapping[str, Any]], Optional[str]]:
"""
Task to redact all of a users events in the given rooms, tracking which, if any, events
whose redaction failed
"""
assert task.params is not None
rooms = task.params.get("rooms")
assert rooms is not None
r = task.params.get("requester")
assert r is not None
admin = Requester.deserialize(self._store, r)
user_id = task.params.get("user_id")
assert user_id is not None
requester = create_requester(
user_id, authenticated_entity=admin.user.to_string()
)
reason = task.params.get("reason")
limit = task.params.get("limit")
assert limit is not None
result: Mapping[str, Any] = (
task.result if task.result else {"failed_redactions": {}}
)
for room in rooms:
room_version = await self._store.get_room_version(room)
event_ids = await self._store.get_events_sent_by_user_in_room(
user_id,
room,
limit,
["m.room.member", "m.room.message"],
)
if not event_ids:
# there's nothing to redact
return TaskStatus.COMPLETE, result, None
events = await self._store.get_events_as_list(event_ids)
for event in events:
# we care about join events but not other membership events
if event.type == "m.room.member":
content = event.content
if content:
if content.get("membership") == Membership.JOIN:
pass
else:
continue
relations = await self._store.get_relations_for_event(
room, event.event_id, event, event_type=EventTypes.Redaction
)
# if we've already successfully redacted this event then skip processing it
if relations[0]:
continue
event_dict = {
"type": EventTypes.Redaction,
"content": {"reason": reason} if reason else {},
"room_id": room,
"sender": user_id,
}
if room_version.updated_redaction_rules:
event_dict["content"]["redacts"] = event.event_id
else:
event_dict["redacts"] = event.event_id
try:
# set the prev event to the offending message to allow for redactions
# to be processed in the case where the user has been kicked/banned before
# redactions are requested
(
redaction,
_,
) = await self.event_creation_handler.create_and_send_nonmember_event(
requester,
event_dict,
prev_event_ids=[event.event_id],
ratelimit=False,
)
except Exception as ex:
logger.info(
f"Redaction of event {event.event_id} failed due to: {ex}"
)
result["failed_redactions"][event.event_id] = str(ex)
await self._task_scheduler.update_task(task.id, result=result)
return TaskStatus.COMPLETE, result, None
class ExfiltrationWriter(metaclass=abc.ABCMeta):
"""Interface used to specify how to write exported data."""

View File

@ -98,6 +98,8 @@ from synapse.rest.admin.users import (
DeactivateAccountRestServlet,
PushersRestServlet,
RateLimitRestServlet,
RedactUser,
RedactUserStatus,
ResetPasswordRestServlet,
SearchUsersRestServlet,
ShadowBanRestServlet,
@ -319,6 +321,8 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
UserReplaceMasterCrossSigningKeyRestServlet(hs).register(http_server)
UserByExternalId(hs).register(http_server)
UserByThreePid(hs).register(http_server)
RedactUser(hs).register(http_server)
RedactUserStatus(hs).register(http_server)
DeviceRestServlet(hs).register(http_server)
DevicesRestServlet(hs).register(http_server)

View File

@ -50,7 +50,7 @@ from synapse.rest.admin._base import (
from synapse.rest.client._base import client_patterns
from synapse.storage.databases.main.registration import ExternalIDReuseException
from synapse.storage.databases.main.stats import UserSortOrder
from synapse.types import JsonDict, JsonMapping, UserID
from synapse.types import JsonDict, JsonMapping, TaskStatus, UserID
from synapse.types.rest import RequestBodyModel
if TYPE_CHECKING:
@ -1405,3 +1405,100 @@ class UserByThreePid(RestServlet):
raise NotFoundError("User not found")
return HTTPStatus.OK, {"user_id": user_id}
class RedactUser(RestServlet):
"""
Redact all the events of a given user in the given rooms or if empty dict is provided
then all events in all rooms user is member of. Kicks off a background process and
returns an id that can be used to check on the progress of the redaction progress
"""
PATTERNS = admin_patterns("/user/(?P<user_id>[^/]*)/redact")
def __init__(self, hs: "HomeServer"):
self._auth = hs.get_auth()
self._store = hs.get_datastores().main
self.admin_handler = hs.get_admin_handler()
async def on_POST(
self, request: SynapseRequest, user_id: str
) -> Tuple[int, JsonDict]:
requester = await self._auth.get_user_by_req(request)
await assert_user_is_admin(self._auth, requester)
body = parse_json_object_from_request(request, allow_empty_body=True)
rooms = body.get("rooms")
if rooms is None:
raise SynapseError(
HTTPStatus.BAD_REQUEST, "Must provide a value for rooms."
)
reason = body.get("reason")
if reason:
if not isinstance(reason, str):
raise SynapseError(
HTTPStatus.BAD_REQUEST,
"If a reason is provided it must be a string.",
)
limit = body.get("limit")
if limit:
if not isinstance(limit, int) or limit <= 0:
raise SynapseError(
HTTPStatus.BAD_REQUEST,
"If limit is provided it must be a non-negative integer greater than 0.",
)
if not rooms:
rooms = await self._store.get_rooms_for_user(user_id)
redact_id = await self.admin_handler.start_redact_events(
user_id, list(rooms), requester.serialize(), reason, limit
)
return HTTPStatus.OK, {"redact_id": redact_id}
class RedactUserStatus(RestServlet):
"""
Check on the progress of the redaction request represented by the provided ID, returning
the status of the process and a dict of events that were unable to be redacted, if any
"""
PATTERNS = admin_patterns("/user/redact_status/(?P<redact_id>[^/]*)$")
def __init__(self, hs: "HomeServer"):
self._auth = hs.get_auth()
self.admin_handler = hs.get_admin_handler()
async def on_GET(
self, request: SynapseRequest, redact_id: str
) -> Tuple[int, JsonDict]:
await assert_requester_is_admin(self._auth, request)
task = await self.admin_handler.get_redact_task(redact_id)
if task:
if task.status == TaskStatus.ACTIVE:
return HTTPStatus.OK, {"status": TaskStatus.ACTIVE}
elif task.status == TaskStatus.COMPLETE:
assert task.result is not None
failed_redactions = task.result.get("failed_redactions")
return HTTPStatus.OK, {
"status": TaskStatus.COMPLETE,
"failed_redactions": failed_redactions if failed_redactions else {},
}
elif task.status == TaskStatus.SCHEDULED:
return HTTPStatus.OK, {"status": TaskStatus.SCHEDULED}
else:
return HTTPStatus.OK, {
"status": TaskStatus.FAILED,
"error": (
task.error
if task.error
else "Unknown error, please check the logs for more information."
),
}
else:
raise NotFoundError("redact id '%s' not found" % redact_id)

View File

@ -2467,6 +2467,76 @@ class EventsWorkerStore(SQLBaseStore):
self.invalidate_get_event_cache_after_txn(txn, event_id)
async def get_events_sent_by_user_in_room(
self, user_id: str, room_id: str, limit: int, filter: Optional[List[str]] = None
) -> Optional[List[str]]:
"""
Get a list of event ids of events sent by the user in the specified room
Args:
user_id: user ID to search against
room_id: room ID of the room to search for events in
filter: type of events to filter for
limit: maximum number of event ids to return
"""
def _get_events_by_user_in_room_txn(
txn: LoggingTransaction,
user_id: str,
room_id: str,
filter: Optional[List[str]],
batch_size: int,
offset: int,
) -> Tuple[Optional[List[str]], int]:
if filter:
base_clause, args = make_in_list_sql_clause(
txn.database_engine, "type", filter
)
clause = f"AND {base_clause}"
parameters = (user_id, room_id, *args, batch_size, offset)
else:
clause = ""
parameters = (user_id, room_id, batch_size, offset)
sql = f"""
SELECT event_id FROM events
WHERE sender = ? AND room_id = ?
{clause}
ORDER BY received_ts DESC
LIMIT ?
OFFSET ?
"""
txn.execute(sql, parameters)
res = txn.fetchall()
if res:
events = [row[0] for row in res]
else:
events = None
return events, offset + batch_size
offset = 0
batch_size = 100
if batch_size > limit:
batch_size = limit
selected_ids: List[str] = []
while offset < limit:
res, offset = await self.db_pool.runInteraction(
"get_events_by_user",
_get_events_by_user_in_room_txn,
user_id,
room_id,
filter,
batch_size,
offset,
)
if res:
selected_ids = selected_ids + res
else:
break
return selected_ids
async def have_finished_sliding_sync_background_jobs(self) -> bool:
"""Return if it's safe to use the sliding sync membership tables."""

View File

@ -21,9 +21,11 @@
import hashlib
import hmac
import json
import os
import urllib.parse
from binascii import unhexlify
from http import HTTPStatus
from typing import Dict, List, Optional
from unittest.mock import AsyncMock, Mock, patch
@ -33,7 +35,7 @@ from twisted.test.proto_helpers import MemoryReactor
from twisted.web.resource import Resource
import synapse.rest.admin
from synapse.api.constants import ApprovalNoticeMedium, LoginType, UserTypes
from synapse.api.constants import ApprovalNoticeMedium, EventTypes, LoginType, UserTypes
from synapse.api.errors import Codes, HttpResponseException, ResourceLimitError
from synapse.api.room_versions import RoomVersions
from synapse.media.filepath import MediaFilePaths
@ -5089,3 +5091,271 @@ class UserSuspensionTestCase(unittest.HomeserverTestCase):
res5 = self.get_success(self.store.get_user_suspended_status(self.bad_user))
self.assertEqual(True, res5)
class UserRedactionTestCase(unittest.HomeserverTestCase):
servlets = [
synapse.rest.admin.register_servlets,
login.register_servlets,
admin.register_servlets,
room.register_servlets,
sync.register_servlets,
]
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.admin = self.register_user("thomas", "pass", True)
self.admin_tok = self.login("thomas", "pass")
self.bad_user = self.register_user("teresa", "pass")
self.bad_user_tok = self.login("teresa", "pass")
self.store = hs.get_datastores().main
self.spam_checker = hs.get_module_api_callbacks().spam_checker
# create rooms - room versions 11+ store the `redacts` key in content while
# earlier ones don't so we use a mix of room versions
self.rm1 = self.helper.create_room_as(
self.admin, tok=self.admin_tok, room_version="7"
)
self.rm2 = self.helper.create_room_as(self.admin, tok=self.admin_tok)
self.rm3 = self.helper.create_room_as(
self.admin, tok=self.admin_tok, room_version="11"
)
def test_redact_messages_all_rooms(self) -> None:
"""
Test that request to redact events in all rooms user is member of is successful
"""
# join rooms, send some messages
originals = []
for rm in [self.rm1, self.rm2, self.rm3]:
join = self.helper.join(rm, self.bad_user, tok=self.bad_user_tok)
originals.append(join["event_id"])
for i in range(15):
event = {"body": f"hello{i}", "msgtype": "m.text"}
res = self.helper.send_event(
rm, "m.room.message", event, tok=self.bad_user_tok, expect_code=200
)
originals.append(res["event_id"])
# redact all events in all rooms
channel = self.make_request(
"POST",
f"/_synapse/admin/v1/user/{self.bad_user}/redact",
content={"rooms": []},
access_token=self.admin_tok,
)
self.assertEqual(channel.code, 200)
matched = []
for rm in [self.rm1, self.rm2, self.rm3]:
filter = json.dumps({"types": [EventTypes.Redaction]})
channel = self.make_request(
"GET",
f"rooms/{rm}/messages?filter={filter}&limit=50",
access_token=self.admin_tok,
)
self.assertEqual(channel.code, 200)
for event in channel.json_body["chunk"]:
for event_id in originals:
if (
event["type"] == "m.room.redaction"
and event["redacts"] == event_id
):
matched.append(event_id)
self.assertEqual(len(matched), len(originals))
def test_redact_messages_specific_rooms(self) -> None:
"""
Test that request to redact events in specified rooms user is member of is successful
"""
originals = []
for rm in [self.rm1, self.rm2, self.rm3]:
join = self.helper.join(rm, self.bad_user, tok=self.bad_user_tok)
originals.append(join["event_id"])
for i in range(15):
event = {"body": f"hello{i}", "msgtype": "m.text"}
res = self.helper.send_event(
rm, "m.room.message", event, tok=self.bad_user_tok
)
originals.append(res["event_id"])
# redact messages in rooms 1 and 3
channel = self.make_request(
"POST",
f"/_synapse/admin/v1/user/{self.bad_user}/redact",
content={"rooms": [self.rm1, self.rm3]},
access_token=self.admin_tok,
)
self.assertEqual(channel.code, 200)
# messages in requested rooms are redacted
for rm in [self.rm1, self.rm3]:
filter = json.dumps({"types": [EventTypes.Redaction]})
channel = self.make_request(
"GET",
f"rooms/{rm}/messages?filter={filter}&limit=50",
access_token=self.admin_tok,
)
self.assertEqual(channel.code, 200)
matches = []
for event in channel.json_body["chunk"]:
for event_id in originals:
if (
event["type"] == "m.room.redaction"
and event["redacts"] == event_id
):
matches.append((event_id, event))
# we redacted 16 messages
self.assertEqual(len(matches), 16)
channel = self.make_request(
"GET", f"rooms/{self.rm2}/messages?limit=50", access_token=self.admin_tok
)
self.assertEqual(channel.code, 200)
# messages in remaining room are not
for event in channel.json_body["chunk"]:
if event["type"] == "m.room.redaction":
self.fail("found redaction in room 2")
def test_redact_status(self) -> None:
rm2_originals = []
for rm in [self.rm1, self.rm2, self.rm3]:
join = self.helper.join(rm, self.bad_user, tok=self.bad_user_tok)
if rm == self.rm2:
rm2_originals.append(join["event_id"])
for i in range(5):
event = {"body": f"hello{i}", "msgtype": "m.text"}
res = self.helper.send_event(
rm, "m.room.message", event, tok=self.bad_user_tok
)
if rm == self.rm2:
rm2_originals.append(res["event_id"])
# redact messages in rooms 1 and 3
channel = self.make_request(
"POST",
f"/_synapse/admin/v1/user/{self.bad_user}/redact",
content={"rooms": [self.rm1, self.rm3]},
access_token=self.admin_tok,
)
self.assertEqual(channel.code, 200)
id = channel.json_body.get("redact_id")
channel2 = self.make_request(
"GET",
f"/_synapse/admin/v1/user/redact_status/{id}",
access_token=self.admin_tok,
)
self.assertEqual(channel2.code, 200)
self.assertEqual(channel2.json_body.get("status"), "complete")
self.assertEqual(channel2.json_body.get("failed_redactions"), {})
# mock that will cause persisting the redaction events to fail
async def check_event_for_spam(event: str) -> str:
return "spam"
self.spam_checker.check_event_for_spam = check_event_for_spam # type: ignore
channel3 = self.make_request(
"POST",
f"/_synapse/admin/v1/user/{self.bad_user}/redact",
content={"rooms": [self.rm2]},
access_token=self.admin_tok,
)
self.assertEqual(channel.code, 200)
id = channel3.json_body.get("redact_id")
channel4 = self.make_request(
"GET",
f"/_synapse/admin/v1/user/redact_status/{id}",
access_token=self.admin_tok,
)
self.assertEqual(channel4.code, 200)
self.assertEqual(channel4.json_body.get("status"), "complete")
failed_redactions = channel4.json_body.get("failed_redactions")
assert failed_redactions is not None
matched = []
for original in rm2_originals:
if failed_redactions.get(original) is not None:
matched.append(original)
self.assertEqual(len(matched), len(rm2_originals))
def test_admin_redact_works_if_user_kicked_or_banned(self) -> None:
originals = []
for rm in [self.rm1, self.rm2, self.rm3]:
join = self.helper.join(rm, self.bad_user, tok=self.bad_user_tok)
originals.append(join["event_id"])
for i in range(5):
event = {"body": f"hello{i}", "msgtype": "m.text"}
res = self.helper.send_event(
rm, "m.room.message", event, tok=self.bad_user_tok
)
originals.append(res["event_id"])
# kick user from rooms 1 and 3
for r in [self.rm1, self.rm2]:
channel = self.make_request(
"POST",
f"/_matrix/client/r0/rooms/{r}/kick",
content={"reason": "being a bummer", "user_id": self.bad_user},
access_token=self.admin_tok,
)
self.assertEqual(channel.code, HTTPStatus.OK, channel.result)
# redact messages in room 1 and 3
channel1 = self.make_request(
"POST",
f"/_synapse/admin/v1/user/{self.bad_user}/redact",
content={"rooms": [self.rm1, self.rm3]},
access_token=self.admin_tok,
)
self.assertEqual(channel1.code, 200)
id = channel1.json_body.get("redact_id")
# check that there were no failed redactions in room 1 and 3
channel2 = self.make_request(
"GET",
f"/_synapse/admin/v1/user/redact_status/{id}",
access_token=self.admin_tok,
)
self.assertEqual(channel2.code, 200)
self.assertEqual(channel2.json_body.get("status"), "complete")
failed_redactions = channel2.json_body.get("failed_redactions")
self.assertEqual(failed_redactions, {})
# ban user
channel3 = self.make_request(
"POST",
f"/_matrix/client/r0/rooms/{self.rm2}/ban",
content={"reason": "being a bummer", "user_id": self.bad_user},
access_token=self.admin_tok,
)
self.assertEqual(channel3.code, HTTPStatus.OK, channel3.result)
# redact messages in room 2
channel4 = self.make_request(
"POST",
f"/_synapse/admin/v1/user/{self.bad_user}/redact",
content={"rooms": [self.rm2]},
access_token=self.admin_tok,
)
self.assertEqual(channel4.code, 200)
id2 = channel1.json_body.get("redact_id")
# check that there were no failed redactions in room 2
channel5 = self.make_request(
"GET",
f"/_synapse/admin/v1/user/redact_status/{id2}",
access_token=self.admin_tok,
)
self.assertEqual(channel5.code, 200)
self.assertEqual(channel5.json_body.get("status"), "complete")
failed_redactions = channel5.json_body.get("failed_redactions")
self.assertEqual(failed_redactions, {})