Refactor config to be an experimental feature

Also enforce you can't combine it with incompatible config options
This commit is contained in:
Hugh Nimmo-Smith 2023-05-09 16:20:04 +02:00 committed by Patrick Cloke
parent 03920bdd4e
commit 249f4a338d
18 changed files with 479 additions and 96 deletions

View File

@ -65,7 +65,7 @@ class PrivateKeyJWTWithKid(PrivateKeyJWT):
)
class OAuthDelegatedAuth(BaseAuth):
class MSC3861DelegatedAuth(BaseAuth):
AUTH_METHODS = {
"client_secret_post": encode_client_secret_post,
"client_secret_basic": encode_client_secret_basic,
@ -78,35 +78,38 @@ class OAuthDelegatedAuth(BaseAuth):
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self._config = hs.config.auth
assert self._config.oauth_delegation_enabled, "OAuth delegation is not enabled"
assert self._config.oauth_delegation_issuer, "No issuer provided"
assert self._config.oauth_delegation_client_id, "No client_id provided"
assert self._config.oauth_delegation_client_secret, "No client_secret provided"
assert (
self._config.oauth_delegation_client_auth_method
in OAuthDelegatedAuth.AUTH_METHODS
), "Invalid client_auth_method"
self._config = hs.config.experimental.msc3861
auth_method = MSC3861DelegatedAuth.AUTH_METHODS.get(
self._config.client_auth_method.value, None
)
# Those assertions are already checked when parsing the config
assert self._config.enabled, "OAuth delegation is not enabled"
assert self._config.issuer, "No issuer provided"
assert self._config.client_id, "No client_id provided"
assert auth_method is not None, "Invalid client_auth_method provided"
self._http_client = hs.get_proxied_http_client()
self._hostname = hs.hostname
self._issuer_metadata = RetryOnExceptionCachedCall(self._load_metadata)
secret = self._config.oauth_delegation_client_secret
if isinstance(auth_method, PrivateKeyJWTWithKid):
# Use the JWK as the client secret when using the private_key_jwt method
assert self._config.jwk, "No JWK provided"
self._client_auth = ClientAuth(
self._config.oauth_delegation_client_id,
secret,
OAuthDelegatedAuth.AUTH_METHODS[
self._config.oauth_delegation_client_auth_method
],
self._config.client_id, self._config.jwk, auth_method
)
else:
# Else use the client secret
assert self._config.client_secret, "No client_secret provided"
self._client_auth = ClientAuth(
self._config.client_id, self._config.client_secret, auth_method
)
async def _load_metadata(self) -> OpenIDProviderMetadata:
if self._config.oauth_delegation_issuer_metadata is not None:
return OpenIDProviderMetadata(
**self._config.oauth_delegation_issuer_metadata
)
url = get_well_known_url(self._config.oauth_delegation_issuer, external=True)
if self._config.issuer_metadata is not None:
return OpenIDProviderMetadata(**self._config.issuer_metadata)
url = get_well_known_url(self._config.issuer, external=True)
response = await self._http_client.get_json(url)
metadata = OpenIDProviderMetadata(**response)
# metadata.validate_introspection_endpoint()
@ -203,7 +206,7 @@ class OAuthDelegatedAuth(BaseAuth):
)
user_id_str = await self.store.get_user_by_external_id(
OAuthDelegatedAuth.EXTERNAL_ID_PROVIDER, sub
MSC3861DelegatedAuth.EXTERNAL_ID_PROVIDER, sub
)
if user_id_str is None:
# If we could not find a user via the external_id, it either does not exist,
@ -236,7 +239,7 @@ class OAuthDelegatedAuth(BaseAuth):
# And record the sub as external_id
await self.store.record_user_external_id(
OAuthDelegatedAuth.EXTERNAL_ID_PROVIDER, sub, user_id.to_string()
MSC3861DelegatedAuth.EXTERNAL_ID_PROVIDER, sub, user_id.to_string()
)
else:
user_id = UserID.from_string(user_id_str)

View File

@ -14,11 +14,9 @@
# limitations under the License.
from typing import Any
from authlib.jose.rfc7517 import JsonWebKey
from synapse.types import JsonDict
from ._base import Config, ConfigError
from ._base import Config
class AuthConfig(Config):
@ -31,7 +29,14 @@ class AuthConfig(Config):
if password_config is None:
password_config = {}
passwords_enabled = password_config.get("enabled", True)
# The default value of password_config.enabled is True, unless msc3861 is enabled.
msc3861_enabled = (
config.get("experimental_features", {})
.get("msc3861", {})
.get("enabled", False)
)
passwords_enabled = password_config.get("enabled", not msc3861_enabled)
# 'only_for_reauth' allows users who have previously set a password to use it,
# even though passwords would otherwise be disabled.
passwords_for_reauth_only = passwords_enabled == "only_for_reauth"
@ -55,29 +60,3 @@ class AuthConfig(Config):
self.ui_auth_session_timeout = self.parse_duration(
ui_auth.get("session_timeout", 0)
)
oauth_delegation = config.get("oauth_delegation", {})
self.oauth_delegation_enabled = oauth_delegation.get("enabled", False)
self.oauth_delegation_issuer = oauth_delegation.get("issuer", "")
self.oauth_delegation_issuer_metadata = oauth_delegation.get("issuer_metadata")
self.oauth_delegation_account = oauth_delegation.get("account", "")
self.oauth_delegation_client_id = oauth_delegation.get("client_id", "")
self.oauth_delegation_client_secret = oauth_delegation.get("client_secret", "")
self.oauth_delegation_client_auth_method = oauth_delegation.get(
"client_auth_method", "client_secret_post"
)
self.password_enabled = password_config.get(
"enabled", not self.oauth_delegation_enabled
)
if self.oauth_delegation_client_auth_method == "private_key_jwt":
self.oauth_delegation_client_secret = JsonWebKey.import_key(
self.oauth_delegation_client_secret
)
# If we are delegating via OAuth then password cannot be supported as well
if self.oauth_delegation_enabled and self.password_enabled:
raise ConfigError(
"Password auth cannot be enabled when OAuth delegation is enabled"
)

View File

@ -12,15 +12,196 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any, Optional
import enum
from typing import TYPE_CHECKING, Any, Optional
import attr
import attr.validators
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions
from synapse.config import ConfigError
from synapse.config._base import Config
from synapse.config._base import Config, RootConfig
from synapse.types import JsonDict
# Determine whether authlib is installed.
try:
import authlib # noqa: F401
HAS_AUTHLIB = True
except ImportError:
HAS_AUTHLIB = False
if TYPE_CHECKING:
# Only import this if we're type checking, as it might not be installed at runtime.
from authlib.jose.rfc7517 import JsonWebKey
class ClientAuthMethod(enum.Enum):
"""List of supported client auth methods."""
CLIENT_SECRET_POST = "client_secret_post"
CLIENT_SECRET_BASIC = "client_secret_basic"
CLIENT_SECRET_JWT = "client_secret_jwt"
PRIVATE_KEY_JWT = "private_key_jwt"
def _parse_jwks(jwks: Optional[JsonDict]) -> Optional["JsonWebKey"]:
"""A helper function to parse a JWK dict into a JsonWebKey."""
if jwks is None:
return None
from authlib.jose.rfc7517 import JsonWebKey
return JsonWebKey.import_key(jwks)
@attr.s(slots=True, frozen=True)
class MSC3861:
"""Configuration for MSC3861: Matrix architecture change to delegate authentication via OIDC"""
enabled: bool = attr.ib(default=False, validator=attr.validators.instance_of(bool))
"""Whether to enable MSC3861 auth delegation."""
@enabled.validator
def _check_enabled(self, attribute: attr.Attribute, value: bool) -> None:
# Only allow enabling MSC3861 if authlib is installed
if value and not HAS_AUTHLIB:
raise ConfigError(
"MSC3861 is enabled but authlib is not installed. "
"Please install authlib to use MSC3861."
)
issuer: str = attr.ib(default="", validator=attr.validators.instance_of(str))
"""The URL of the OIDC Provider."""
issuer_metadata: Optional[JsonDict] = attr.ib(default=None)
"""The issuer metadata to use, otherwise discovered from /.well-known/openid-configuration as per MSC2965."""
client_id: str = attr.ib(
default="",
validator=attr.validators.instance_of(str),
)
"""The client ID to use when calling the introspection endpoint."""
client_auth_method: ClientAuthMethod = attr.ib(
default=ClientAuthMethod.CLIENT_SECRET_POST, converter=ClientAuthMethod
)
"""The auth method used when calling the introspection endpoint."""
client_secret: Optional[str] = attr.ib(
default=None,
validator=attr.validators.optional(attr.validators.instance_of(str)),
)
"""
The client secret to use when calling the introspection endpoint,
when using any of the client_secret_* client auth methods.
"""
jwk: Optional["JsonWebKey"] = attr.ib(default=None, converter=_parse_jwks)
"""
The JWKS to use when calling the introspection endpoint,
when using the private_key_jwt client auth method.
"""
@client_auth_method.validator
def _check_client_auth_method(
self, attribute: attr.Attribute, value: ClientAuthMethod
) -> None:
# Check that the right client credentials are provided for the client auth method.
if not self.enabled:
return
if value == ClientAuthMethod.PRIVATE_KEY_JWT and self.jwk is None:
raise ConfigError(
"A JWKS must be provided when using the private_key_jwt client auth method"
)
if (
value
in (
ClientAuthMethod.CLIENT_SECRET_POST,
ClientAuthMethod.CLIENT_SECRET_BASIC,
ClientAuthMethod.CLIENT_SECRET_JWT,
)
and self.client_secret is None
):
raise ConfigError(
f"A client secret must be provided when using the {value} client auth method"
)
account_management_url: Optional[str] = attr.ib(
default=None,
validator=attr.validators.optional(attr.validators.instance_of(str)),
)
"""The URL of the My Account page on the OIDC Provider as per MSC2965."""
def check_config_conflicts(self, root: RootConfig) -> None:
"""Checks for any configuration conflicts with other parts of Synapse.
Raises:
ConfigError: If there are any configuration conflicts.
"""
if not self.enabled:
return
if (
root.auth.password_enabled_for_reauth
or root.auth.password_enabled_for_login
):
raise ConfigError(
"Password auth cannot be enabled when OAuth delegation is enabled"
)
if root.registration.enable_registration:
raise ConfigError(
"Registration cannot be enabled when OAuth delegation is enabled"
)
if (
root.oidc.oidc_enabled
or root.saml2.saml2_enabled
or root.cas.cas_enabled
or root.jwt.jwt_enabled
):
raise ConfigError("SSO cannot be enabled when OAuth delegation is enabled")
if bool(root.authproviders.password_providers):
raise ConfigError(
"Password auth providers cannot be enabled when OAuth delegation is enabled"
)
if root.captcha.enable_registration_captcha:
raise ConfigError(
"CAPTCHA cannot be enabled when OAuth delegation is enabled"
)
if root.experimental.msc3882_enabled:
raise ConfigError(
"MSC3882 cannot be enabled when OAuth delegation is enabled"
)
if root.registration.refresh_token_lifetime:
raise ConfigError(
"refresh_token_lifetime cannot be set when OAuth delegation is enabled"
)
if root.registration.nonrefreshable_access_token_lifetime:
raise ConfigError(
"nonrefreshable_access_token_lifetime cannot be set when OAuth delegation is enabled"
)
if root.registration.session_lifetime:
raise ConfigError(
"session_lifetime cannot be set when OAuth delegation is enabled"
)
if not root.experimental.msc3970_enabled:
raise ConfigError(
"experimental_features.msc3970_enabled must be 'true' when OAuth delegation is enabled"
)
@attr.s(auto_attribs=True, frozen=True, slots=True)
class MSC3866Config:
@ -182,8 +363,14 @@ class ExperimentalConfig(Config):
"msc3981_recurse_relations", False
)
# MSC3861: Matrix architecture change to delegate authentication via OIDC
self.msc3861 = MSC3861(**experimental.get("msc3861", {}))
# MSC3970: Scope transaction IDs to devices
self.msc3970_enabled = experimental.get("msc3970_enabled", False)
self.msc3970_enabled = experimental.get("msc3970_enabled", self.msc3861.enabled)
# Check that none of the other config options conflict with MSC3861 when enabled
self.msc3861.check_config_conflicts(self.root)
# MSC4009: E.164 Matrix IDs
self.msc4009_e164_mxids = experimental.get("msc4009_e164_mxids", False)

View File

@ -274,7 +274,7 @@ class AuthHandler:
# response.
self._extra_attributes: Dict[str, SsoLoginExtraAttributes] = {}
self.oauth_delegation_enabled = hs.config.auth.oauth_delegation_enabled
self.msc3861_oauth_delegation_enabled = hs.config.experimental.msc3861.enabled
async def validate_user_via_ui_auth(
self,
@ -325,7 +325,7 @@ class AuthHandler:
LimitExceededError if the ratelimiter's failed request count for this
user is too high to proceed
"""
if self.oauth_delegation_enabled:
if self.msc3861_oauth_delegation_enabled:
raise SynapseError(
HTTPStatus.INTERNAL_SERVER_ERROR, "UIA shouldn't be used with MSC3861"
)

View File

@ -38,6 +38,7 @@ from twisted.web.resource import Resource
from synapse.api import errors
from synapse.api.errors import SynapseError
from synapse.config import ConfigError
from synapse.events import EventBase
from synapse.events.presence_router import (
GET_INTERESTED_USERS_CALLBACK,
@ -252,6 +253,7 @@ class ModuleApi:
self._device_handler = hs.get_device_handler()
self.custom_template_dir = hs.config.server.custom_template_directory
self._callbacks = hs.get_module_api_callbacks()
self.msc3861_oauth_delegation_enabled = hs.config.experimental.msc3861.enabled
try:
app_name = self._hs.config.email.email_app_name
@ -419,6 +421,11 @@ class ModuleApi:
Added in Synapse v1.46.0.
"""
if self.msc3861_oauth_delegation_enabled:
raise ConfigError(
"Cannot use password auth provider callbacks when OAuth delegation is enabled"
)
return self._password_auth_provider.register_password_auth_provider_callbacks(
check_3pid_auth=check_3pid_auth,
on_logged_out=on_logged_out,

View File

@ -601,7 +601,7 @@ class ThreepidRestServlet(RestServlet):
# ThreePidBindRestServelet.PostBody with an `alias_generator` to handle
# `threePidCreds` versus `three_pid_creds`.
async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
if self.hs.config.auth.oauth_delegation_enabled:
if self.hs.config.experimental.msc3861.enabled:
raise NotFoundError(errcode=Codes.UNRECOGNIZED)
if not self.hs.config.registration.enable_3pid_changes:
@ -894,7 +894,7 @@ class AccountStatusRestServlet(RestServlet):
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
if hs.config.worker.worker_app is None:
if not hs.config.auth.oauth_delegation_enabled:
if not hs.config.experimental.msc3861.enabled:
EmailPasswordRequestTokenRestServlet(hs).register(http_server)
DeactivateAccountRestServlet(hs).register(http_server)
PasswordRestServlet(hs).register(http_server)
@ -906,7 +906,7 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
if hs.config.worker.worker_app is None:
ThreepidBindRestServlet(hs).register(http_server)
ThreepidUnbindRestServlet(hs).register(http_server)
if not hs.config.auth.oauth_delegation_enabled:
if not hs.config.experimental.msc3861.enabled:
ThreepidAddRestServlet(hs).register(http_server)
ThreepidDeleteRestServlet(hs).register(http_server)
WhoamiRestServlet(hs).register(http_server)

View File

@ -135,7 +135,7 @@ class DeviceRestServlet(RestServlet):
self.device_handler = handler
self.auth_handler = hs.get_auth_handler()
self._msc3852_enabled = hs.config.experimental.msc3852_enabled
self.oauth_delegation_enabled = hs.config.auth.oauth_delegation_enabled
self._msc3861_oauth_delegation_enabled = hs.config.experimental.msc3861.enabled
async def on_GET(
self, request: SynapseRequest, device_id: str
@ -167,7 +167,7 @@ class DeviceRestServlet(RestServlet):
async def on_DELETE(
self, request: SynapseRequest, device_id: str
) -> Tuple[int, JsonDict]:
if self.oauth_delegation_enabled:
if self._msc3861_oauth_delegation_enabled:
raise UnrecognizedRequestError(code=404)
requester = await self.auth.get_user_by_req(request)
@ -350,7 +350,7 @@ class ClaimDehydratedDeviceServlet(RestServlet):
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
if (
hs.config.worker.worker_app is None
and not hs.config.auth.oauth_delegation_enabled
and not hs.config.experimental.msc3861.enabled
):
DeleteDevicesRestServlet(hs).register(http_server)
DevicesRestServlet(hs).register(http_server)

View File

@ -386,7 +386,7 @@ class SigningKeyUploadServlet(RestServlet):
# time. Because there is no UIA in MSC3861, for now we throw an error if the
# user tries to reset the device signing key when MSC3861 is enabled, but allow
# first-time setup.
if self.hs.config.auth.oauth_delegation_enabled:
if self.hs.config.experimental.msc3861.enabled:
# There is no way to reset the device signing key with MSC3861
if is_cross_signing_setup:
raise SynapseError(

View File

@ -633,7 +633,7 @@ class CasTicketServlet(RestServlet):
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
if hs.config.auth.oauth_delegation_enabled:
if hs.config.experimental.msc3861.enabled:
return
LoginRestServlet(hs).register(http_server)

View File

@ -80,7 +80,7 @@ class LogoutAllRestServlet(RestServlet):
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
if hs.config.auth.oauth_delegation_enabled:
if hs.config.experimental.msc3861.enabled:
return
LogoutRestServlet(hs).register(http_server)

View File

@ -955,7 +955,7 @@ def _calculate_registration_flows(
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
if hs.config.auth.oauth_delegation_enabled:
if hs.config.experimental.msc3861.enabled:
return
if hs.config.worker.worker_app is None:

View File

@ -47,7 +47,7 @@ def build_synapse_client_resource_tree(hs: "HomeServer") -> Mapping[str, Resourc
}
# Expose the JWKS endpoint if OAuth2 delegation is enabled
if hs.config.auth.oauth_delegation_enabled:
if hs.config.experimental.msc3861.enabled:
from synapse.rest.synapse.client.jwks import JwksResource
resources["/_synapse/jwks"] = JwksResource(hs)

View File

@ -26,8 +26,6 @@ logger = logging.getLogger(__name__)
class JwksResource(DirectServeJsonResource):
def __init__(self, hs: "HomeServer"):
from authlib.jose.rfc7517 import Key
super().__init__(extract_context=True)
# Parameters that are allowed to be exposed in the public key.
@ -53,10 +51,10 @@ class JwksResource(DirectServeJsonResource):
"ext",
}
secret = hs.config.auth.oauth_delegation_client_secret
key = hs.config.experimental.msc3861.jwk
if isinstance(secret, Key):
private_key = secret.as_dict()
if key is not None:
private_key = key.as_dict()
public_key = {
k: v for k, v in private_key.items() if k in public_parameters
}

View File

@ -44,14 +44,15 @@ class WellKnownBuilder:
"base_url": self._config.registration.default_identity_server
}
if self._config.auth.oauth_delegation_enabled:
# We use the MSC3861 values as they are used by multiple MSCs
if self._config.experimental.msc3861.enabled:
result["org.matrix.msc2965.authentication"] = {
"issuer": self._config.auth.oauth_delegation_issuer
"issuer": self._config.experimental.msc3861.issuer
}
if self._config.auth.oauth_delegation_account != "":
if self._config.experimental.msc3861.account_management_url is not None:
result["org.matrix.msc2965.authentication"][
"account"
] = self._config.auth.oauth_delegation_account
] = self._config.experimental.msc3861.account_management_url
if self._config.server.extra_well_known_client_content:
for (

View File

@ -428,10 +428,10 @@ class HomeServer(metaclass=abc.ABCMeta):
@cache_in_self
def get_auth(self) -> Auth:
if self.config.auth.oauth_delegation_enabled:
from synapse.api.auth.oauth_delegated import OAuthDelegatedAuth
if self.config.experimental.msc3861.enabled:
from synapse.api.auth.msc3861_delegated import MSC3861DelegatedAuth
return OAuthDelegatedAuth(self)
return MSC3861DelegatedAuth(self)
return InternalAuth(self)
@cache_in_self

View File

@ -0,0 +1,202 @@
# Copyright 2023 Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any, Dict
from unittest.mock import Mock
from synapse.config import ConfigError
from synapse.module_api import ModuleApi
from synapse.types import JsonDict
from tests.server import get_clock
from tests.unittest import HomeserverTestCase, override_config, skip_unless
try:
import authlib # noqa: F401
HAS_AUTHLIB = True
except ImportError:
HAS_AUTHLIB = False
# These are a few constants that are used as config parameters in the tests.
SERVER_NAME = "test"
ISSUER = "https://issuer/"
CLIENT_ID = "test-client-id"
CLIENT_SECRET = "test-client-secret"
BASE_URL = "https://synapse/"
class CustomAuthModule:
"""A module which registers a password auth provider."""
@staticmethod
def parse_config(config: JsonDict) -> None:
pass
def __init__(self, config: None, api: ModuleApi):
api.register_password_auth_provider_callbacks(
auth_checkers={("m.login.password", ("password",)): Mock()},
)
@skip_unless(HAS_AUTHLIB, "requires authlib")
class MSC3861OAuthDelegation(HomeserverTestCase):
"""Test that the Homeserver fails to initialize if the config is invalid."""
def setUp(self) -> None:
self.reactor, self.clock = get_clock()
self._hs_args = {"clock": self.clock, "reactor": self.reactor}
def default_config(self) -> Dict[str, Any]:
config = super().default_config()
config["public_baseurl"] = BASE_URL
if "experimental_features" not in config:
config["experimental_features"] = {}
config["experimental_features"]["msc3861"] = {
"enabled": True,
"issuer": ISSUER,
"client_id": CLIENT_ID,
"client_auth_method": "client_secret_post",
"client_secret": CLIENT_SECRET,
}
return config
def test_registration_cannot_be_enabled(self) -> None:
with self.assertRaises(ConfigError):
self.setup_test_homeserver()
@override_config(
{
"enable_registration": False,
"password_config": {
"enabled": True,
},
}
)
def test_password_config_cannot_be_enabled(self) -> None:
with self.assertRaises(ConfigError):
self.setup_test_homeserver()
@override_config(
{
"enable_registration": False,
"oidc_providers": [
{
"idp_id": "microsoft",
"idp_name": "Microsoft",
"issuer": "https://login.microsoftonline.com/<tenant id>/v2.0",
"client_id": "<client id>",
"client_secret": "<client secret>",
"scopes": ["openid", "profile"],
"authorization_endpoint": "https://login.microsoftonline.com/<tenant id>/oauth2/v2.0/authorize",
"token_endpoint": "https://login.microsoftonline.com/<tenant id>/oauth2/v2.0/token",
"userinfo_endpoint": "https://graph.microsoft.com/oidc/userinfo",
}
],
}
)
def test_oidc_sso_cannot_be_enabled(self) -> None:
with self.assertRaises(ConfigError):
self.setup_test_homeserver()
@override_config(
{
"enable_registration": False,
"cas_config": {
"enabled": True,
"server_url": "https://cas-server.com",
"displayname_attribute": "name",
"required_attributes": {"userGroup": "staff", "department": "None"},
},
}
)
def test_cas_sso_cannot_be_enabled(self) -> None:
with self.assertRaises(ConfigError):
self.setup_test_homeserver()
@override_config(
{
"enable_registration": False,
"modules": [
{
"module": f"{__name__}.{CustomAuthModule.__qualname__}",
"config": {},
}
],
}
)
def test_auth_providers_cannot_be_enabled(self) -> None:
with self.assertRaises(ConfigError):
self.setup_test_homeserver()
@override_config(
{
"enable_registration": False,
"jwt_config": {
"enabled": True,
"secret": "my-secret-token",
"algorithm": "HS256",
},
}
)
def test_jwt_auth_cannot_be_enabled(self) -> None:
with self.assertRaises(ConfigError):
self.setup_test_homeserver()
@override_config(
{
"enable_registration": False,
"experimental_features": {
"msc3882_enabled": True,
},
}
)
def test_msc3882_auth_cannot_be_enabled(self) -> None:
with self.assertRaises(ConfigError):
self.setup_test_homeserver()
@override_config(
{
"enable_registration": False,
"recaptcha_public_key": "test",
"recaptcha_private_key": "test",
"enable_registration_captcha": True,
}
)
def test_captcha_cannot_be_enabled(self) -> None:
with self.assertRaises(ConfigError):
self.setup_test_homeserver()
@override_config(
{
"enable_registration": False,
"refresh_token_lifetime": "24h",
"refreshable_access_token_lifetime": "10m",
"nonrefreshable_access_token_lifetime": "24h",
}
)
def test_refreshable_tokens_cannot_be_enabled(self) -> None:
with self.assertRaises(ConfigError):
self.setup_test_homeserver()
@override_config(
{
"enable_registration": False,
"session_lifetime": "24h",
}
)
def test_session_lifetime_cannot_be_set(self) -> None:
with self.assertRaises(ConfigError):
self.setup_test_homeserver()

View File

@ -109,13 +109,16 @@ class MSC3861OAuthDelegation(HomeserverTestCase):
def default_config(self) -> Dict[str, Any]:
config = super().default_config()
config["public_baseurl"] = BASE_URL
config["oauth_delegation"] = {
config["disable_registration"] = True
config["experimental_features"] = {
"msc3861": {
"enabled": True,
"issuer": ISSUER,
"client_id": CLIENT_ID,
"client_auth_method": "client_secret_post",
"client_secret": CLIENT_SECRET,
}
}
return config
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:

View File

@ -108,14 +108,17 @@ class WellKnownTests(unittest.HomeserverTestCase):
@unittest.override_config(
{
"public_baseurl": "https://homeserver", # this is only required so that client well known is served
"oauth_delegation": {
"experimental_features": {
"msc3861": {
"enabled": True,
"issuer": "https://issuer",
"account": "https://my-account.issuer",
"account_management_url": "https://my-account.issuer",
"client_id": "id",
"client_auth_method": "client_secret_post",
"client_secret": "secret",
},
},
"disable_registration": True,
}
)
def test_client_well_known_msc3861_oauth_delegation(self) -> None: