Add type hints to tests/rest/client (#12108)

* Add type hints to `tests/rest/client`

* newsfile

* fix imports

* add `test_account.py`

* Remove one type hint in `test_report_event.py`

* change `on_create_room` to `async`

* update new functions in `test_third_party_rules.py`

* Add `test_filter.py`

* add `test_rooms.py`

* change to `assertEquals` to `assertEqual`

* lint
This commit is contained in:
Dirk Klimpel 2022-03-02 17:34:14 +01:00 committed by GitHub
parent b4461e7d8a
commit 2ffaf30803
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 421 additions and 350 deletions

View file

@ -15,11 +15,12 @@ import json
import os
import re
from email.parser import Parser
from typing import Dict, List, Optional
from typing import Any, Dict, List, Optional, Union
from unittest.mock import Mock
import pkg_resources
from twisted.internet.interfaces import IReactorTCP
from twisted.test.proto_helpers import MemoryReactor
import synapse.rest.admin
@ -30,6 +31,7 @@ from synapse.rest import admin
from synapse.rest.client import account, login, register, room
from synapse.rest.synapse.client.password_reset import PasswordResetSubmitTokenResource
from synapse.server import HomeServer
from synapse.types import JsonDict
from synapse.util import Clock
from tests import unittest
@ -46,7 +48,7 @@ class PasswordResetTestCase(unittest.HomeserverTestCase):
login.register_servlets,
]
def make_homeserver(self, reactor, clock):
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
config = self.default_config()
# Email config.
@ -67,20 +69,27 @@ class PasswordResetTestCase(unittest.HomeserverTestCase):
hs = self.setup_test_homeserver(config=config)
async def sendmail(
reactor, smtphost, smtpport, from_addr, to_addrs, msg, **kwargs
):
self.email_attempts.append(msg)
reactor: IReactorTCP,
smtphost: str,
smtpport: int,
from_addr: str,
to_addr: str,
msg_bytes: bytes,
*args: Any,
**kwargs: Any,
) -> None:
self.email_attempts.append(msg_bytes)
self.email_attempts = []
self.email_attempts: List[bytes] = []
hs.get_send_email_handler()._sendmail = sendmail
return hs
def prepare(self, reactor, clock, hs):
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main
self.submit_token_resource = PasswordResetSubmitTokenResource(hs)
def test_basic_password_reset(self):
def test_basic_password_reset(self) -> None:
"""Test basic password reset flow"""
old_password = "monkey"
new_password = "kangeroo"
@ -118,7 +127,7 @@ class PasswordResetTestCase(unittest.HomeserverTestCase):
self.attempt_wrong_password_login("kermit", old_password)
@override_config({"rc_3pid_validation": {"burst_count": 3}})
def test_ratelimit_by_email(self):
def test_ratelimit_by_email(self) -> None:
"""Test that we ratelimit /requestToken for the same email."""
old_password = "monkey"
new_password = "kangeroo"
@ -139,7 +148,7 @@ class PasswordResetTestCase(unittest.HomeserverTestCase):
)
)
def reset(ip):
def reset(ip: str) -> None:
client_secret = "foobar"
session_id = self._request_token(email, client_secret, ip)
@ -166,7 +175,7 @@ class PasswordResetTestCase(unittest.HomeserverTestCase):
self.assertEqual(cm.exception.code, 429)
def test_basic_password_reset_canonicalise_email(self):
def test_basic_password_reset_canonicalise_email(self) -> None:
"""Test basic password reset flow
Request password reset with different spelling
"""
@ -206,7 +215,7 @@ class PasswordResetTestCase(unittest.HomeserverTestCase):
# Assert we can't log in with the old password
self.attempt_wrong_password_login("kermit", old_password)
def test_cant_reset_password_without_clicking_link(self):
def test_cant_reset_password_without_clicking_link(self) -> None:
"""Test that we do actually need to click the link in the email"""
old_password = "monkey"
new_password = "kangeroo"
@ -241,7 +250,7 @@ class PasswordResetTestCase(unittest.HomeserverTestCase):
# Assert we can't log in with the new password
self.attempt_wrong_password_login("kermit", new_password)
def test_no_valid_token(self):
def test_no_valid_token(self) -> None:
"""Test that we do actually need to request a token and can't just
make a session up.
"""
@ -277,7 +286,7 @@ class PasswordResetTestCase(unittest.HomeserverTestCase):
self.attempt_wrong_password_login("kermit", new_password)
@unittest.override_config({"request_token_inhibit_3pid_errors": True})
def test_password_reset_bad_email_inhibit_error(self):
def test_password_reset_bad_email_inhibit_error(self) -> None:
"""Test that triggering a password reset with an email address that isn't bound
to an account doesn't leak the lack of binding for that address if configured
that way.
@ -292,7 +301,12 @@ class PasswordResetTestCase(unittest.HomeserverTestCase):
self.assertIsNotNone(session_id)
def _request_token(self, email, client_secret, ip="127.0.0.1"):
def _request_token(
self,
email: str,
client_secret: str,
ip: str = "127.0.0.1",
) -> str:
channel = self.make_request(
"POST",
b"account/password/email/requestToken",
@ -309,7 +323,7 @@ class PasswordResetTestCase(unittest.HomeserverTestCase):
return channel.json_body["sid"]
def _validate_token(self, link):
def _validate_token(self, link: str) -> None:
# Remove the host
path = link.replace("https://example.com", "")
@ -339,7 +353,7 @@ class PasswordResetTestCase(unittest.HomeserverTestCase):
)
self.assertEqual(200, channel.code, channel.result)
def _get_link_from_email(self):
def _get_link_from_email(self) -> str:
assert self.email_attempts, "No emails have been sent"
raw_msg = self.email_attempts[-1].decode("UTF-8")
@ -354,14 +368,19 @@ class PasswordResetTestCase(unittest.HomeserverTestCase):
if not text:
self.fail("Could not find text portion of email to parse")
assert text is not None
match = re.search(r"https://example.com\S+", text)
assert match, "Could not find link in email"
return match.group(0)
def _reset_password(
self, new_password, session_id, client_secret, expected_code=200
):
self,
new_password: str,
session_id: str,
client_secret: str,
expected_code: int = 200,
) -> None:
channel = self.make_request(
"POST",
b"account/password",
@ -388,11 +407,11 @@ class DeactivateTestCase(unittest.HomeserverTestCase):
room.register_servlets,
]
def make_homeserver(self, reactor, clock):
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
self.hs = self.setup_test_homeserver()
return self.hs
def test_deactivate_account(self):
def test_deactivate_account(self) -> None:
user_id = self.register_user("kermit", "test")
tok = self.login("kermit", "test")
@ -407,7 +426,7 @@ class DeactivateTestCase(unittest.HomeserverTestCase):
channel = self.make_request("GET", "account/whoami", access_token=tok)
self.assertEqual(channel.code, 401)
def test_pending_invites(self):
def test_pending_invites(self) -> None:
"""Tests that deactivating a user rejects every pending invite for them."""
store = self.hs.get_datastores().main
@ -448,7 +467,7 @@ class DeactivateTestCase(unittest.HomeserverTestCase):
self.assertEqual(len(memberships), 1, memberships)
self.assertEqual(memberships[0].room_id, room_id, memberships)
def deactivate(self, user_id, tok):
def deactivate(self, user_id: str, tok: str) -> None:
request_data = json.dumps(
{
"auth": {
@ -474,12 +493,12 @@ class WhoamiTestCase(unittest.HomeserverTestCase):
register.register_servlets,
]
def default_config(self):
def default_config(self) -> Dict[str, Any]:
config = super().default_config()
config["allow_guest_access"] = True
return config
def test_GET_whoami(self):
def test_GET_whoami(self) -> None:
device_id = "wouldgohere"
user_id = self.register_user("kermit", "test")
tok = self.login("kermit", "test", device_id=device_id)
@ -496,7 +515,7 @@ class WhoamiTestCase(unittest.HomeserverTestCase):
},
)
def test_GET_whoami_guests(self):
def test_GET_whoami_guests(self) -> None:
channel = self.make_request(
b"POST", b"/_matrix/client/r0/register?kind=guest", b"{}"
)
@ -516,7 +535,7 @@ class WhoamiTestCase(unittest.HomeserverTestCase):
},
)
def test_GET_whoami_appservices(self):
def test_GET_whoami_appservices(self) -> None:
user_id = "@as:test"
as_token = "i_am_an_app_service"
@ -541,7 +560,7 @@ class WhoamiTestCase(unittest.HomeserverTestCase):
)
self.assertFalse(hasattr(whoami, "device_id"))
def _whoami(self, tok):
def _whoami(self, tok: str) -> JsonDict:
channel = self.make_request("GET", "account/whoami", {}, access_token=tok)
self.assertEqual(channel.code, 200)
return channel.json_body
@ -555,7 +574,7 @@ class ThreepidEmailRestTestCase(unittest.HomeserverTestCase):
synapse.rest.admin.register_servlets_for_client_rest_resource,
]
def make_homeserver(self, reactor, clock):
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
config = self.default_config()
# Email config.
@ -576,16 +595,23 @@ class ThreepidEmailRestTestCase(unittest.HomeserverTestCase):
self.hs = self.setup_test_homeserver(config=config)
async def sendmail(
reactor, smtphost, smtpport, from_addr, to_addrs, msg, **kwargs
):
self.email_attempts.append(msg)
reactor: IReactorTCP,
smtphost: str,
smtpport: int,
from_addr: str,
to_addr: str,
msg_bytes: bytes,
*args: Any,
**kwargs: Any,
) -> None:
self.email_attempts.append(msg_bytes)
self.email_attempts = []
self.email_attempts: List[bytes] = []
self.hs.get_send_email_handler()._sendmail = sendmail
return self.hs
def prepare(self, reactor, clock, hs):
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main
self.user_id = self.register_user("kermit", "test")
@ -593,83 +619,73 @@ class ThreepidEmailRestTestCase(unittest.HomeserverTestCase):
self.email = "test@example.com"
self.url_3pid = b"account/3pid"
def test_add_valid_email(self):
self.get_success(self._add_email(self.email, self.email))
def test_add_valid_email(self) -> None:
self._add_email(self.email, self.email)
def test_add_valid_email_second_time(self):
self.get_success(self._add_email(self.email, self.email))
self.get_success(
self._request_token_invalid_email(
self.email,
expected_errcode=Codes.THREEPID_IN_USE,
expected_error="Email is already in use",
)
def test_add_valid_email_second_time(self) -> None:
self._add_email(self.email, self.email)
self._request_token_invalid_email(
self.email,
expected_errcode=Codes.THREEPID_IN_USE,
expected_error="Email is already in use",
)
def test_add_valid_email_second_time_canonicalise(self):
self.get_success(self._add_email(self.email, self.email))
self.get_success(
self._request_token_invalid_email(
"TEST@EXAMPLE.COM",
expected_errcode=Codes.THREEPID_IN_USE,
expected_error="Email is already in use",
)
def test_add_valid_email_second_time_canonicalise(self) -> None:
self._add_email(self.email, self.email)
self._request_token_invalid_email(
"TEST@EXAMPLE.COM",
expected_errcode=Codes.THREEPID_IN_USE,
expected_error="Email is already in use",
)
def test_add_email_no_at(self):
self.get_success(
self._request_token_invalid_email(
"address-without-at.bar",
expected_errcode=Codes.UNKNOWN,
expected_error="Unable to parse email address",
)
def test_add_email_no_at(self) -> None:
self._request_token_invalid_email(
"address-without-at.bar",
expected_errcode=Codes.UNKNOWN,
expected_error="Unable to parse email address",
)
def test_add_email_two_at(self):
self.get_success(
self._request_token_invalid_email(
"foo@foo@test.bar",
expected_errcode=Codes.UNKNOWN,
expected_error="Unable to parse email address",
)
def test_add_email_two_at(self) -> None:
self._request_token_invalid_email(
"foo@foo@test.bar",
expected_errcode=Codes.UNKNOWN,
expected_error="Unable to parse email address",
)
def test_add_email_bad_format(self):
self.get_success(
self._request_token_invalid_email(
"user@bad.example.net@good.example.com",
expected_errcode=Codes.UNKNOWN,
expected_error="Unable to parse email address",
)
def test_add_email_bad_format(self) -> None:
self._request_token_invalid_email(
"user@bad.example.net@good.example.com",
expected_errcode=Codes.UNKNOWN,
expected_error="Unable to parse email address",
)
def test_add_email_domain_to_lower(self):
self.get_success(self._add_email("foo@TEST.BAR", "foo@test.bar"))
def test_add_email_domain_to_lower(self) -> None:
self._add_email("foo@TEST.BAR", "foo@test.bar")
def test_add_email_domain_with_umlaut(self):
self.get_success(self._add_email("foo@Öumlaut.com", "foo@öumlaut.com"))
def test_add_email_domain_with_umlaut(self) -> None:
self._add_email("foo@Öumlaut.com", "foo@öumlaut.com")
def test_add_email_address_casefold(self):
self.get_success(self._add_email("Strauß@Example.com", "strauss@example.com"))
def test_add_email_address_casefold(self) -> None:
self._add_email("Strauß@Example.com", "strauss@example.com")
def test_address_trim(self):
self.get_success(self._add_email(" foo@test.bar ", "foo@test.bar"))
def test_address_trim(self) -> None:
self._add_email(" foo@test.bar ", "foo@test.bar")
@override_config({"rc_3pid_validation": {"burst_count": 3}})
def test_ratelimit_by_ip(self):
def test_ratelimit_by_ip(self) -> None:
"""Tests that adding emails is ratelimited by IP"""
# We expect to be able to set three emails before getting ratelimited.
self.get_success(self._add_email("foo1@test.bar", "foo1@test.bar"))
self.get_success(self._add_email("foo2@test.bar", "foo2@test.bar"))
self.get_success(self._add_email("foo3@test.bar", "foo3@test.bar"))
self._add_email("foo1@test.bar", "foo1@test.bar")
self._add_email("foo2@test.bar", "foo2@test.bar")
self._add_email("foo3@test.bar", "foo3@test.bar")
with self.assertRaises(HttpResponseException) as cm:
self.get_success(self._add_email("foo4@test.bar", "foo4@test.bar"))
self._add_email("foo4@test.bar", "foo4@test.bar")
self.assertEqual(cm.exception.code, 429)
def test_add_email_if_disabled(self):
def test_add_email_if_disabled(self) -> None:
"""Test adding email to profile when doing so is disallowed"""
self.hs.config.registration.enable_3pid_changes = False
@ -695,7 +711,7 @@ class ThreepidEmailRestTestCase(unittest.HomeserverTestCase):
},
access_token=self.user_id_tok,
)
self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual(400, channel.code, msg=channel.result["body"])
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
# Get user
@ -705,10 +721,10 @@ class ThreepidEmailRestTestCase(unittest.HomeserverTestCase):
access_token=self.user_id_tok,
)
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual(200, channel.code, msg=channel.result["body"])
self.assertFalse(channel.json_body["threepids"])
def test_delete_email(self):
def test_delete_email(self) -> None:
"""Test deleting an email from profile"""
# Add a threepid
self.get_success(
@ -727,7 +743,7 @@ class ThreepidEmailRestTestCase(unittest.HomeserverTestCase):
{"medium": "email", "address": self.email},
access_token=self.user_id_tok,
)
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual(200, channel.code, msg=channel.result["body"])
# Get user
channel = self.make_request(
@ -736,10 +752,10 @@ class ThreepidEmailRestTestCase(unittest.HomeserverTestCase):
access_token=self.user_id_tok,
)
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual(200, channel.code, msg=channel.result["body"])
self.assertFalse(channel.json_body["threepids"])
def test_delete_email_if_disabled(self):
def test_delete_email_if_disabled(self) -> None:
"""Test deleting an email from profile when disallowed"""
self.hs.config.registration.enable_3pid_changes = False
@ -761,7 +777,7 @@ class ThreepidEmailRestTestCase(unittest.HomeserverTestCase):
access_token=self.user_id_tok,
)
self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual(400, channel.code, msg=channel.result["body"])
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
# Get user
@ -771,11 +787,11 @@ class ThreepidEmailRestTestCase(unittest.HomeserverTestCase):
access_token=self.user_id_tok,
)
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual(200, channel.code, msg=channel.result["body"])
self.assertEqual("email", channel.json_body["threepids"][0]["medium"])
self.assertEqual(self.email, channel.json_body["threepids"][0]["address"])
def test_cant_add_email_without_clicking_link(self):
def test_cant_add_email_without_clicking_link(self) -> None:
"""Test that we do actually need to click the link in the email"""
client_secret = "foobar"
session_id = self._request_token(self.email, client_secret)
@ -797,7 +813,7 @@ class ThreepidEmailRestTestCase(unittest.HomeserverTestCase):
},
access_token=self.user_id_tok,
)
self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual(400, channel.code, msg=channel.result["body"])
self.assertEqual(Codes.THREEPID_AUTH_FAILED, channel.json_body["errcode"])
# Get user
@ -807,10 +823,10 @@ class ThreepidEmailRestTestCase(unittest.HomeserverTestCase):
access_token=self.user_id_tok,
)
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual(200, channel.code, msg=channel.result["body"])
self.assertFalse(channel.json_body["threepids"])
def test_no_valid_token(self):
def test_no_valid_token(self) -> None:
"""Test that we do actually need to request a token and can't just
make a session up.
"""
@ -832,7 +848,7 @@ class ThreepidEmailRestTestCase(unittest.HomeserverTestCase):
},
access_token=self.user_id_tok,
)
self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual(400, channel.code, msg=channel.result["body"])
self.assertEqual(Codes.THREEPID_AUTH_FAILED, channel.json_body["errcode"])
# Get user
@ -842,11 +858,11 @@ class ThreepidEmailRestTestCase(unittest.HomeserverTestCase):
access_token=self.user_id_tok,
)
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual(200, channel.code, msg=channel.result["body"])
self.assertFalse(channel.json_body["threepids"])
@override_config({"next_link_domain_whitelist": None})
def test_next_link(self):
def test_next_link(self) -> None:
"""Tests a valid next_link parameter value with no whitelist (good case)"""
self._request_token(
"something@example.com",
@ -856,7 +872,7 @@ class ThreepidEmailRestTestCase(unittest.HomeserverTestCase):
)
@override_config({"next_link_domain_whitelist": None})
def test_next_link_exotic_protocol(self):
def test_next_link_exotic_protocol(self) -> None:
"""Tests using a esoteric protocol as a next_link parameter value.
Someone may be hosting a client on IPFS etc.
"""
@ -868,7 +884,7 @@ class ThreepidEmailRestTestCase(unittest.HomeserverTestCase):
)
@override_config({"next_link_domain_whitelist": None})
def test_next_link_file_uri(self):
def test_next_link_file_uri(self) -> None:
"""Tests next_link parameters cannot be file URI"""
# Attempt to use a next_link value that points to the local disk
self._request_token(
@ -879,7 +895,7 @@ class ThreepidEmailRestTestCase(unittest.HomeserverTestCase):
)
@override_config({"next_link_domain_whitelist": ["example.com", "example.org"]})
def test_next_link_domain_whitelist(self):
def test_next_link_domain_whitelist(self) -> None:
"""Tests next_link parameters must fit the whitelist if provided"""
# Ensure not providing a next_link parameter still works
@ -912,7 +928,7 @@ class ThreepidEmailRestTestCase(unittest.HomeserverTestCase):
)
@override_config({"next_link_domain_whitelist": []})
def test_empty_next_link_domain_whitelist(self):
def test_empty_next_link_domain_whitelist(self) -> None:
"""Tests an empty next_lint_domain_whitelist value, meaning next_link is essentially
disallowed
"""
@ -962,28 +978,28 @@ class ThreepidEmailRestTestCase(unittest.HomeserverTestCase):
def _request_token_invalid_email(
self,
email,
expected_errcode,
expected_error,
client_secret="foobar",
):
email: str,
expected_errcode: str,
expected_error: str,
client_secret: str = "foobar",
) -> None:
channel = self.make_request(
"POST",
b"account/3pid/email/requestToken",
{"client_secret": client_secret, "email": email, "send_attempt": 1},
)
self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual(400, channel.code, msg=channel.result["body"])
self.assertEqual(expected_errcode, channel.json_body["errcode"])
self.assertEqual(expected_error, channel.json_body["error"])
def _validate_token(self, link):
def _validate_token(self, link: str) -> None:
# Remove the host
path = link.replace("https://example.com", "")
channel = self.make_request("GET", path, shorthand=False)
self.assertEqual(200, channel.code, channel.result)
def _get_link_from_email(self):
def _get_link_from_email(self) -> str:
assert self.email_attempts, "No emails have been sent"
raw_msg = self.email_attempts[-1].decode("UTF-8")
@ -998,12 +1014,13 @@ class ThreepidEmailRestTestCase(unittest.HomeserverTestCase):
if not text:
self.fail("Could not find text portion of email to parse")
assert text is not None
match = re.search(r"https://example.com\S+", text)
assert match, "Could not find link in email"
return match.group(0)
def _add_email(self, request_email, expected_email):
def _add_email(self, request_email: str, expected_email: str) -> None:
"""Test adding an email to profile"""
previous_email_attempts = len(self.email_attempts)
@ -1030,7 +1047,7 @@ class ThreepidEmailRestTestCase(unittest.HomeserverTestCase):
access_token=self.user_id_tok,
)
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual(200, channel.code, msg=channel.result["body"])
# Get user
channel = self.make_request(
@ -1039,7 +1056,7 @@ class ThreepidEmailRestTestCase(unittest.HomeserverTestCase):
access_token=self.user_id_tok,
)
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual(200, channel.code, msg=channel.result["body"])
self.assertEqual("email", channel.json_body["threepids"][0]["medium"])
threepids = {threepid["address"] for threepid in channel.json_body["threepids"]}
@ -1055,18 +1072,18 @@ class AccountStatusTestCase(unittest.HomeserverTestCase):
url = "/_matrix/client/unstable/org.matrix.msc3720/account_status"
def make_homeserver(self, reactor, clock):
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
config = self.default_config()
config["experimental_features"] = {"msc3720_enabled": True}
return self.setup_test_homeserver(config=config)
def prepare(self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer):
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.requester = self.register_user("requester", "password")
self.requester_tok = self.login("requester", "password")
self.server_name = homeserver.config.server.server_name
self.server_name = hs.config.server.server_name
def test_missing_mxid(self):
def test_missing_mxid(self) -> None:
"""Tests that not providing any MXID raises an error."""
self._test_status(
users=None,
@ -1074,7 +1091,7 @@ class AccountStatusTestCase(unittest.HomeserverTestCase):
expected_errcode=Codes.MISSING_PARAM,
)
def test_invalid_mxid(self):
def test_invalid_mxid(self) -> None:
"""Tests that providing an invalid MXID raises an error."""
self._test_status(
users=["bad:test"],
@ -1082,7 +1099,7 @@ class AccountStatusTestCase(unittest.HomeserverTestCase):
expected_errcode=Codes.INVALID_PARAM,
)
def test_local_user_not_exists(self):
def test_local_user_not_exists(self) -> None:
"""Tests that the account status endpoints correctly reports that a user doesn't
exist.
"""
@ -1098,7 +1115,7 @@ class AccountStatusTestCase(unittest.HomeserverTestCase):
expected_failures=[],
)
def test_local_user_exists(self):
def test_local_user_exists(self) -> None:
"""Tests that the account status endpoint correctly reports that a user doesn't
exist.
"""
@ -1115,7 +1132,7 @@ class AccountStatusTestCase(unittest.HomeserverTestCase):
expected_failures=[],
)
def test_local_user_deactivated(self):
def test_local_user_deactivated(self) -> None:
"""Tests that the account status endpoint correctly reports a deactivated user."""
user = self.register_user("someuser", "password")
self.get_success(
@ -1135,7 +1152,7 @@ class AccountStatusTestCase(unittest.HomeserverTestCase):
expected_failures=[],
)
def test_mixed_local_and_remote_users(self):
def test_mixed_local_and_remote_users(self) -> None:
"""Tests that if some users are remote the account status endpoint correctly
merges the remote responses with the local result.
"""
@ -1150,7 +1167,13 @@ class AccountStatusTestCase(unittest.HomeserverTestCase):
"@bad:badremote",
]
async def post_json(destination, path, data, *a, **kwa):
async def post_json(
destination: str,
path: str,
data: Optional[JsonDict] = None,
*a: Any,
**kwa: Any,
) -> Union[JsonDict, list]:
if destination == "remote":
return {
"account_statuses": {
@ -1160,9 +1183,7 @@ class AccountStatusTestCase(unittest.HomeserverTestCase):
},
}
}
if destination == "otherremote":
return {}
if destination == "badremote":
elif destination == "badremote":
# badremote tries to overwrite the status of a user that doesn't belong
# to it (i.e. users[1]) with false data, which Synapse is expected to
# ignore.
@ -1176,6 +1197,9 @@ class AccountStatusTestCase(unittest.HomeserverTestCase):
},
}
}
# if destination == "otherremote"
else:
return {}
# Register a mock that will return the expected result depending on the remote.
self.hs.get_federation_http_client().post_json = Mock(side_effect=post_json)
@ -1205,7 +1229,7 @@ class AccountStatusTestCase(unittest.HomeserverTestCase):
expected_statuses: Optional[Dict[str, Dict[str, bool]]] = None,
expected_failures: Optional[List[str]] = None,
expected_errcode: Optional[str] = None,
):
) -> None:
"""Send a request to the account status endpoint and check that the response
matches with what's expected.