Fix type hints in test_login.py

This commit is contained in:
Richard van der Hoff 2021-01-06 15:51:18 +00:00
parent 3fc2399dbe
commit bbd04441ed
2 changed files with 55 additions and 24 deletions

View File

@ -103,6 +103,7 @@ files =
tests/replication, tests/replication,
tests/test_utils, tests/test_utils,
tests/handlers/test_password_providers.py, tests/handlers/test_password_providers.py,
tests/rest/client/v1/test_login.py,
tests/rest/client/v2_alpha/test_auth.py, tests/rest/client/v2_alpha/test_auth.py,
tests/util/test_stream_change_cache.py tests/util/test_stream_change_cache.py

View File

@ -1,14 +1,24 @@
import json # -*- coding: utf-8 -*-
# Copyright 2019-2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time import time
import urllib.parse import urllib.parse
from typing import Any, Dict, Union
from mock import Mock from mock import Mock
try:
import jwt
except ImportError:
jwt = None
import synapse.rest.admin import synapse.rest.admin
from synapse.appservice import ApplicationService from synapse.appservice import ApplicationService
from synapse.rest.client.v1 import login, logout from synapse.rest.client.v1 import login, logout
@ -16,7 +26,33 @@ from synapse.rest.client.v2_alpha import devices, register
from synapse.rest.client.v2_alpha.account import WhoamiRestServlet from synapse.rest.client.v2_alpha.account import WhoamiRestServlet
from tests import unittest from tests import unittest
from tests.unittest import override_config from tests.unittest import override_config, skip_unless
try:
import jwt
HAS_JWT = True
except ImportError:
HAS_JWT = False
# public_base_url used in some tests
BASE_URL = "https://synapse/"
# CAS server used in some tests
CAS_SERVER = "https://fake.test"
# just enough to tell pysaml2 where to redirect to
SAML_SERVER = "https://test.saml.server/idp/sso"
TEST_SAML_METADATA = """
<md:EntityDescriptor xmlns:md="urn:oasis:names:tc:SAML:2.0:metadata">
<md:IDPSSODescriptor protocolSupportEnumeration="urn:oasis:names:tc:SAML:2.0:protocol">
<md:SingleSignOnService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect" Location="%(SAML_SERVER)s"/>
</md:IDPSSODescriptor>
</md:EntityDescriptor>
""" % {
"SAML_SERVER": SAML_SERVER,
}
LOGIN_URL = b"/_matrix/client/r0/login" LOGIN_URL = b"/_matrix/client/r0/login"
TEST_URL = b"/_matrix/client/r0/account/whoami" TEST_URL = b"/_matrix/client/r0/account/whoami"
@ -461,10 +497,8 @@ class CASTestCase(unittest.HomeserverTestCase):
self.assertIn(b"SSO account deactivated", channel.result["body"]) self.assertIn(b"SSO account deactivated", channel.result["body"])
@skip_unless(HAS_JWT, "requires jwt")
class JWTTestCase(unittest.HomeserverTestCase): class JWTTestCase(unittest.HomeserverTestCase):
if not jwt:
skip = "requires jwt"
servlets = [ servlets = [
synapse.rest.admin.register_servlets_for_client_rest_resource, synapse.rest.admin.register_servlets_for_client_rest_resource,
login.register_servlets, login.register_servlets,
@ -480,17 +514,17 @@ class JWTTestCase(unittest.HomeserverTestCase):
self.hs.config.jwt_algorithm = self.jwt_algorithm self.hs.config.jwt_algorithm = self.jwt_algorithm
return self.hs return self.hs
def jwt_encode(self, token: str, secret: str = jwt_secret) -> str: def jwt_encode(self, payload: Dict[str, Any], secret: str = jwt_secret) -> str:
# PyJWT 2.0.0 changed the return type of jwt.encode from bytes to str. # PyJWT 2.0.0 changed the return type of jwt.encode from bytes to str.
result = jwt.encode(token, secret, self.jwt_algorithm) result = jwt.encode(
payload, secret, self.jwt_algorithm
) # type: Union[str, bytes]
if isinstance(result, bytes): if isinstance(result, bytes):
return result.decode("ascii") return result.decode("ascii")
return result return result
def jwt_login(self, *args): def jwt_login(self, *args):
params = json.dumps( params = {"type": "org.matrix.login.jwt", "token": self.jwt_encode(*args)}
{"type": "org.matrix.login.jwt", "token": self.jwt_encode(*args)}
)
channel = self.make_request(b"POST", LOGIN_URL, params) channel = self.make_request(b"POST", LOGIN_URL, params)
return channel return channel
@ -622,7 +656,7 @@ class JWTTestCase(unittest.HomeserverTestCase):
) )
def test_login_no_token(self): def test_login_no_token(self):
params = json.dumps({"type": "org.matrix.login.jwt"}) params = {"type": "org.matrix.login.jwt"}
channel = self.make_request(b"POST", LOGIN_URL, params) channel = self.make_request(b"POST", LOGIN_URL, params)
self.assertEqual(channel.result["code"], b"403", channel.result) self.assertEqual(channel.result["code"], b"403", channel.result)
self.assertEqual(channel.json_body["errcode"], "M_FORBIDDEN") self.assertEqual(channel.json_body["errcode"], "M_FORBIDDEN")
@ -632,10 +666,8 @@ class JWTTestCase(unittest.HomeserverTestCase):
# The JWTPubKeyTestCase is a complement to JWTTestCase where we instead use # The JWTPubKeyTestCase is a complement to JWTTestCase where we instead use
# RSS256, with a public key configured in synapse as "jwt_secret", and tokens # RSS256, with a public key configured in synapse as "jwt_secret", and tokens
# signed by the private key. # signed by the private key.
@skip_unless(HAS_JWT, "requires jwt")
class JWTPubKeyTestCase(unittest.HomeserverTestCase): class JWTPubKeyTestCase(unittest.HomeserverTestCase):
if not jwt:
skip = "requires jwt"
servlets = [ servlets = [
login.register_servlets, login.register_servlets,
] ]
@ -692,17 +724,15 @@ class JWTPubKeyTestCase(unittest.HomeserverTestCase):
self.hs.config.jwt_algorithm = "RS256" self.hs.config.jwt_algorithm = "RS256"
return self.hs return self.hs
def jwt_encode(self, token: str, secret: str = jwt_privatekey) -> str: def jwt_encode(self, payload: Dict[str, Any], secret: str = jwt_privatekey) -> str:
# PyJWT 2.0.0 changed the return type of jwt.encode from bytes to str. # PyJWT 2.0.0 changed the return type of jwt.encode from bytes to str.
result = jwt.encode(token, secret, "RS256") result = jwt.encode(payload, secret, "RS256") # type: Union[bytes,str]
if isinstance(result, bytes): if isinstance(result, bytes):
return result.decode("ascii") return result.decode("ascii")
return result return result
def jwt_login(self, *args): def jwt_login(self, *args):
params = json.dumps( params = {"type": "org.matrix.login.jwt", "token": self.jwt_encode(*args)}
{"type": "org.matrix.login.jwt", "token": self.jwt_encode(*args)}
)
channel = self.make_request(b"POST", LOGIN_URL, params) channel = self.make_request(b"POST", LOGIN_URL, params)
return channel return channel