mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-01-13 22:39:26 -05:00
ee382025b0
De-duplicates code between the SAML and OIDC implementations.
91 lines
3.1 KiB
Python
91 lines
3.1 KiB
Python
# -*- coding: utf-8 -*-
|
|
# Copyright 2020 The Matrix.org Foundation C.I.C.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
import logging
|
|
from typing import TYPE_CHECKING, Optional
|
|
|
|
from synapse.handlers._base import BaseHandler
|
|
from synapse.http.server import respond_with_html
|
|
|
|
if TYPE_CHECKING:
|
|
from synapse.server import HomeServer
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class MappingException(Exception):
|
|
"""Used to catch errors when mapping the UserInfo object
|
|
"""
|
|
|
|
|
|
class SsoHandler(BaseHandler):
|
|
def __init__(self, hs: "HomeServer"):
|
|
super().__init__(hs)
|
|
self._error_template = hs.config.sso_error_template
|
|
|
|
def render_error(
|
|
self, request, error: str, error_description: Optional[str] = None
|
|
) -> None:
|
|
"""Renders the error template and responds with it.
|
|
|
|
This is used to show errors to the user. The template of this page can
|
|
be found under `synapse/res/templates/sso_error.html`.
|
|
|
|
Args:
|
|
request: The incoming request from the browser.
|
|
We'll respond with an HTML page describing the error.
|
|
error: A technical identifier for this error.
|
|
error_description: A human-readable description of the error.
|
|
"""
|
|
html = self._error_template.render(
|
|
error=error, error_description=error_description
|
|
)
|
|
respond_with_html(request, 400, html)
|
|
|
|
async def get_sso_user_by_remote_user_id(
|
|
self, auth_provider_id: str, remote_user_id: str
|
|
) -> Optional[str]:
|
|
"""
|
|
Maps the user ID of a remote IdP to a mxid for a previously seen user.
|
|
|
|
If the user has not been seen yet, this will return None.
|
|
|
|
Args:
|
|
auth_provider_id: A unique identifier for this SSO provider, e.g.
|
|
"oidc" or "saml".
|
|
remote_user_id: The user ID according to the remote IdP. This might
|
|
be an e-mail address, a GUID, or some other form. It must be
|
|
unique and immutable.
|
|
|
|
Returns:
|
|
The mxid of a previously seen user.
|
|
"""
|
|
# Check if we already have a mapping for this user.
|
|
logger.info(
|
|
"Looking for existing mapping for user %s:%s",
|
|
auth_provider_id,
|
|
remote_user_id,
|
|
)
|
|
previously_registered_user_id = await self.store.get_user_by_external_id(
|
|
auth_provider_id, remote_user_id,
|
|
)
|
|
|
|
# A match was found, return the user ID.
|
|
if previously_registered_user_id is not None:
|
|
logger.info("Found existing mapping %s", previously_registered_user_id)
|
|
return previously_registered_user_id
|
|
|
|
# No match.
|
|
return None
|