Fixed set a user as an admin with the new API (#6928)

Fix #6910
This commit is contained in:
Dirk Klimpel 2020-02-28 10:58:05 +01:00 committed by GitHub
parent ab0073a6c0
commit 9b06d8f8a6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 200 additions and 43 deletions

1
changelog.d/6910.bugfix Normal file
View File

@ -0,0 +1 @@
Fixed set a user as an admin with the admin API `PUT /_synapse/admin/v2/users/<user_id>`. Contributed by @dklimpel.

View File

@ -211,9 +211,7 @@ class UserRestServletV2(RestServlet):
if target_user == auth_user and not set_admin_to:
raise SynapseError(400, "You may not demote yourself.")
await self.admin_handler.set_user_server_admin(
target_user, set_admin_to
)
await self.store.set_server_admin(target_user, set_admin_to)
if "password" in body:
if (
@ -651,6 +649,6 @@ class UserAdminServlet(RestServlet):
if target_user == auth_user and not set_admin_to:
raise SynapseError(400, "You may not demote yourself.")
await self.store.set_user_server_admin(target_user, set_admin_to)
await self.store.set_server_admin(target_user, set_admin_to)
return 200, {}

View File

@ -301,12 +301,16 @@ class RegistrationWorkerStore(SQLBaseStore):
admin (bool): true iff the user is to be a server admin,
false otherwise.
"""
return self.db.simple_update_one(
table="users",
keyvalues={"name": user.to_string()},
updatevalues={"admin": 1 if admin else 0},
desc="set_server_admin",
def set_server_admin_txn(txn):
self.db.simple_update_one_txn(
txn, "users", {"name": user.to_string()}, {"admin": 1 if admin else 0}
)
self._invalidate_cache_and_stream(
txn, self.get_user_by_id, (user.to_string(),)
)
return self.db.runInteraction("set_server_admin", set_server_admin_txn)
def _query_for_auth(self, txn, token):
sql = (

View File

@ -16,6 +16,7 @@
import hashlib
import hmac
import json
import urllib.parse
from mock import Mock
@ -371,22 +372,24 @@ class UserRestTestCase(unittest.HomeserverTestCase):
def prepare(self, reactor, clock, hs):
self.store = hs.get_datastore()
self.url = "/_synapse/admin/v2/users/@bob:test"
self.admin_user = self.register_user("admin", "pass", admin=True)
self.admin_user_tok = self.login("admin", "pass")
self.other_user = self.register_user("user", "pass")
self.other_user_token = self.login("user", "pass")
self.url_other_user = "/_synapse/admin/v2/users/%s" % urllib.parse.quote(
self.other_user
)
def test_requester_is_no_admin(self):
"""
If the user is not a server admin, an error is returned.
"""
self.hs.config.registration_shared_secret = None
url = "/_synapse/admin/v2/users/@bob:test"
request, channel = self.make_request(
"GET", self.url, access_token=self.other_user_token,
"GET", url, access_token=self.other_user_token,
)
self.render(request)
@ -394,7 +397,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
self.assertEqual("You are not a server admin", channel.json_body["error"])
request, channel = self.make_request(
"PUT", self.url, access_token=self.other_user_token, content=b"{}",
"PUT", url, access_token=self.other_user_token, content=b"{}",
)
self.render(request)
@ -417,24 +420,26 @@ class UserRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(404, channel.code, msg=channel.json_body)
self.assertEqual("M_NOT_FOUND", channel.json_body["errcode"])
def test_requester_is_admin(self):
def test_create_server_admin(self):
"""
If the user is a server admin, a new user is created.
Check that a new admin user is created successfully.
"""
self.hs.config.registration_shared_secret = None
url = "/_synapse/admin/v2/users/@bob:test"
# Create user (server admin)
body = json.dumps(
{
"password": "abc123",
"admin": True,
"displayname": "Bob's name",
"threepids": [{"medium": "email", "address": "bob@bob.bob"}],
}
)
# Create user
request, channel = self.make_request(
"PUT",
self.url,
url,
access_token=self.admin_user_tok,
content=body.encode(encoding="utf_8"),
)
@ -442,29 +447,85 @@ class UserRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(201, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual("@bob:test", channel.json_body["name"])
self.assertEqual("bob", channel.json_body["displayname"])
self.assertEqual("Bob's name", channel.json_body["displayname"])
self.assertEqual("email", channel.json_body["threepids"][0]["medium"])
self.assertEqual("bob@bob.bob", channel.json_body["threepids"][0]["address"])
self.assertEqual(True, channel.json_body["admin"])
# Get user
request, channel = self.make_request(
"GET", self.url, access_token=self.admin_user_tok,
"GET", url, access_token=self.admin_user_tok,
)
self.render(request)
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual("@bob:test", channel.json_body["name"])
self.assertEqual("bob", channel.json_body["displayname"])
self.assertEqual(1, channel.json_body["admin"])
self.assertEqual(0, channel.json_body["is_guest"])
self.assertEqual(0, channel.json_body["deactivated"])
self.assertEqual("Bob's name", channel.json_body["displayname"])
self.assertEqual("email", channel.json_body["threepids"][0]["medium"])
self.assertEqual("bob@bob.bob", channel.json_body["threepids"][0]["address"])
self.assertEqual(True, channel.json_body["admin"])
self.assertEqual(False, channel.json_body["is_guest"])
self.assertEqual(False, channel.json_body["deactivated"])
def test_create_user(self):
"""
Check that a new regular user is created successfully.
"""
self.hs.config.registration_shared_secret = None
url = "/_synapse/admin/v2/users/@bob:test"
# Create user
body = json.dumps(
{
"password": "abc123",
"admin": False,
"displayname": "Bob's name",
"threepids": [{"medium": "email", "address": "bob@bob.bob"}],
}
)
request, channel = self.make_request(
"PUT",
url,
access_token=self.admin_user_tok,
content=body.encode(encoding="utf_8"),
)
self.render(request)
self.assertEqual(201, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual("@bob:test", channel.json_body["name"])
self.assertEqual("Bob's name", channel.json_body["displayname"])
self.assertEqual("email", channel.json_body["threepids"][0]["medium"])
self.assertEqual("bob@bob.bob", channel.json_body["threepids"][0]["address"])
self.assertEqual(False, channel.json_body["admin"])
# Get user
request, channel = self.make_request(
"GET", url, access_token=self.admin_user_tok,
)
self.render(request)
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual("@bob:test", channel.json_body["name"])
self.assertEqual("Bob's name", channel.json_body["displayname"])
self.assertEqual("email", channel.json_body["threepids"][0]["medium"])
self.assertEqual("bob@bob.bob", channel.json_body["threepids"][0]["address"])
self.assertEqual(False, channel.json_body["admin"])
self.assertEqual(False, channel.json_body["is_guest"])
self.assertEqual(False, channel.json_body["deactivated"])
def test_set_password(self):
"""
Test setting a new password for another user.
"""
self.hs.config.registration_shared_secret = None
# Change password
body = json.dumps({"password": "hahaha"})
request, channel = self.make_request(
"PUT",
self.url,
self.url_other_user,
access_token=self.admin_user_tok,
content=body.encode(encoding="utf_8"),
)
@ -472,41 +533,133 @@ class UserRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
def test_set_displayname(self):
"""
Test setting the displayname of another user.
"""
self.hs.config.registration_shared_secret = None
# Modify user
body = json.dumps({"displayname": "foobar"})
request, channel = self.make_request(
"PUT",
self.url_other_user,
access_token=self.admin_user_tok,
content=body.encode(encoding="utf_8"),
)
self.render(request)
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual("@user:test", channel.json_body["name"])
self.assertEqual("foobar", channel.json_body["displayname"])
# Get user
request, channel = self.make_request(
"GET", self.url_other_user, access_token=self.admin_user_tok,
)
self.render(request)
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual("@user:test", channel.json_body["name"])
self.assertEqual("foobar", channel.json_body["displayname"])
def test_set_threepid(self):
"""
Test setting threepid for an other user.
"""
self.hs.config.registration_shared_secret = None
# Delete old and add new threepid to user
body = json.dumps(
{
"displayname": "foobar",
"deactivated": True,
"threepids": [{"medium": "email", "address": "bob2@bob.bob"}],
}
{"threepids": [{"medium": "email", "address": "bob3@bob.bob"}]}
)
request, channel = self.make_request(
"PUT",
self.url,
self.url_other_user,
access_token=self.admin_user_tok,
content=body.encode(encoding="utf_8"),
)
self.render(request)
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual("@bob:test", channel.json_body["name"])
self.assertEqual("foobar", channel.json_body["displayname"])
self.assertEqual("@user:test", channel.json_body["name"])
self.assertEqual("email", channel.json_body["threepids"][0]["medium"])
self.assertEqual("bob3@bob.bob", channel.json_body["threepids"][0]["address"])
# Get user
request, channel = self.make_request(
"GET", self.url_other_user, access_token=self.admin_user_tok,
)
self.render(request)
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual("@user:test", channel.json_body["name"])
self.assertEqual("email", channel.json_body["threepids"][0]["medium"])
self.assertEqual("bob3@bob.bob", channel.json_body["threepids"][0]["address"])
def test_deactivate_user(self):
"""
Test deactivating another user.
"""
# Deactivate user
body = json.dumps({"deactivated": True})
request, channel = self.make_request(
"PUT",
self.url_other_user,
access_token=self.admin_user_tok,
content=body.encode(encoding="utf_8"),
)
self.render(request)
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual("@user:test", channel.json_body["name"])
self.assertEqual(True, channel.json_body["deactivated"])
# the user is deactivated, the threepid will be deleted
# Get user
request, channel = self.make_request(
"GET", self.url, access_token=self.admin_user_tok,
"GET", self.url_other_user, access_token=self.admin_user_tok,
)
self.render(request)
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual("@bob:test", channel.json_body["name"])
self.assertEqual("foobar", channel.json_body["displayname"])
self.assertEqual(1, channel.json_body["admin"])
self.assertEqual(0, channel.json_body["is_guest"])
self.assertEqual(1, channel.json_body["deactivated"])
self.assertEqual("@user:test", channel.json_body["name"])
self.assertEqual(True, channel.json_body["deactivated"])
def test_set_user_as_admin(self):
"""
Test setting the admin flag on a user.
"""
self.hs.config.registration_shared_secret = None
# Set a user as an admin
body = json.dumps({"admin": True})
request, channel = self.make_request(
"PUT",
self.url_other_user,
access_token=self.admin_user_tok,
content=body.encode(encoding="utf_8"),
)
self.render(request)
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual("@user:test", channel.json_body["name"])
self.assertEqual(True, channel.json_body["admin"])
# Get user
request, channel = self.make_request(
"GET", self.url_other_user, access_token=self.admin_user_tok,
)
self.render(request)
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual("@user:test", channel.json_body["name"])
self.assertEqual(True, channel.json_body["admin"])
def test_accidental_deactivation_prevention(self):
"""
@ -514,13 +667,14 @@ class UserRestTestCase(unittest.HomeserverTestCase):
for the deactivated body parameter
"""
self.hs.config.registration_shared_secret = None
url = "/_synapse/admin/v2/users/@bob:test"
# Create user
body = json.dumps({"password": "abc123"})
request, channel = self.make_request(
"PUT",
self.url,
url,
access_token=self.admin_user_tok,
content=body.encode(encoding="utf_8"),
)
@ -532,7 +686,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
# Get user
request, channel = self.make_request(
"GET", self.url, access_token=self.admin_user_tok,
"GET", url, access_token=self.admin_user_tok,
)
self.render(request)
@ -546,7 +700,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
request, channel = self.make_request(
"PUT",
self.url,
url,
access_token=self.admin_user_tok,
content=body.encode(encoding="utf_8"),
)
@ -556,7 +710,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
# Check user is not deactivated
request, channel = self.make_request(
"GET", self.url, access_token=self.admin_user_tok,
"GET", url, access_token=self.admin_user_tok,
)
self.render(request)