Allow server admin to get admin bit in rooms where local user is an admin (#8756)

This adds an admin API that allows a server admin to get power in a room if a local user has power in a room. Will also invite the user if they're not in the room and its a private room. Can specify another user (rather than the admin user) to be granted power.

Co-authored-by: Matthew Hodgson <matthew@matrix.org>
This commit is contained in:
Erik Johnston 2020-12-18 15:37:19 +00:00 committed by GitHub
parent 5e7d75daa2
commit d781a81e69
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 294 additions and 3 deletions

View file

@ -16,8 +16,8 @@ import logging
from http import HTTPStatus
from typing import TYPE_CHECKING, List, Optional, Tuple
from synapse.api.constants import EventTypes, JoinRules
from synapse.api.errors import Codes, NotFoundError, SynapseError
from synapse.api.constants import EventTypes, JoinRules, Membership
from synapse.api.errors import AuthError, Codes, NotFoundError, SynapseError
from synapse.http.servlet import (
RestServlet,
assert_params_in_dict,
@ -37,6 +37,7 @@ from synapse.types import JsonDict, RoomAlias, RoomID, UserID, create_requester
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
@ -367,3 +368,134 @@ class JoinRoomAliasServlet(RestServlet):
)
return 200, {"room_id": room_id}
class MakeRoomAdminRestServlet(RestServlet):
"""Allows a server admin to get power in a room if a local user has power in
a room. Will also invite the user if they're not in the room and it's a
private room. Can specify another user (rather than the admin user) to be
granted power, e.g.:
POST/_synapse/admin/v1/rooms/<room_id_or_alias>/make_room_admin
{
"user_id": "@foo:example.com"
}
"""
PATTERNS = admin_patterns("/rooms/(?P<room_identifier>[^/]*)/make_room_admin")
def __init__(self, hs: "HomeServer"):
self.hs = hs
self.auth = hs.get_auth()
self.room_member_handler = hs.get_room_member_handler()
self.event_creation_handler = hs.get_event_creation_handler()
self.state_handler = hs.get_state_handler()
self.is_mine_id = hs.is_mine_id
async def on_POST(self, request, room_identifier):
requester = await self.auth.get_user_by_req(request)
await assert_user_is_admin(self.auth, requester.user)
content = parse_json_object_from_request(request, allow_empty_body=True)
# Resolve to a room ID, if necessary.
if RoomID.is_valid(room_identifier):
room_id = room_identifier
elif RoomAlias.is_valid(room_identifier):
room_alias = RoomAlias.from_string(room_identifier)
room_id, _ = await self.room_member_handler.lookup_room_alias(room_alias)
room_id = room_id.to_string()
else:
raise SynapseError(
400, "%s was not legal room ID or room alias" % (room_identifier,)
)
# Which user to grant room admin rights to.
user_to_add = content.get("user_id", requester.user.to_string())
# Figure out which local users currently have power in the room, if any.
room_state = await self.state_handler.get_current_state(room_id)
if not room_state:
raise SynapseError(400, "Server not in room")
create_event = room_state[(EventTypes.Create, "")]
power_levels = room_state.get((EventTypes.PowerLevels, ""))
if power_levels is not None:
# We pick the local user with the highest power.
user_power = power_levels.content.get("users", {})
admin_users = [
user_id for user_id in user_power if self.is_mine_id(user_id)
]
admin_users.sort(key=lambda user: user_power[user])
if not admin_users:
raise SynapseError(400, "No local admin user in room")
admin_user_id = admin_users[-1]
pl_content = power_levels.content
else:
# If there is no power level events then the creator has rights.
pl_content = {}
admin_user_id = create_event.sender
if not self.is_mine_id(admin_user_id):
raise SynapseError(
400, "No local admin user in room",
)
# Grant the user power equal to the room admin by attempting to send an
# updated power level event.
new_pl_content = dict(pl_content)
new_pl_content["users"] = dict(pl_content.get("users", {}))
new_pl_content["users"][user_to_add] = new_pl_content["users"][admin_user_id]
fake_requester = create_requester(
admin_user_id, authenticated_entity=requester.authenticated_entity,
)
try:
await self.event_creation_handler.create_and_send_nonmember_event(
fake_requester,
event_dict={
"content": new_pl_content,
"sender": admin_user_id,
"type": EventTypes.PowerLevels,
"state_key": "",
"room_id": room_id,
},
)
except AuthError:
# The admin user we found turned out not to have enough power.
raise SynapseError(
400, "No local admin user in room with power to update power levels."
)
# Now we check if the user we're granting admin rights to is already in
# the room. If not and it's not a public room we invite them.
member_event = room_state.get((EventTypes.Member, user_to_add))
is_joined = False
if member_event:
is_joined = member_event.content["membership"] in (
Membership.JOIN,
Membership.INVITE,
)
if is_joined:
return 200, {}
join_rules = room_state.get((EventTypes.JoinRules, ""))
is_public = False
if join_rules:
is_public = join_rules.content.get("join_rule") == JoinRules.PUBLIC
if is_public:
return 200, {}
await self.room_member_handler.update_membership(
fake_requester,
target=UserID.from_string(user_to_add),
room_id=room_id,
action=Membership.INVITE,
)
return 200, {}