Support trying multiple localparts for OpenID Connect. (#8801)

Abstracts the SAML and OpenID Connect code which attempts to regenerate
the localpart of a matrix ID if it is already in use.
This commit is contained in:
Patrick Cloke 2020-11-25 10:04:22 -05:00 committed by GitHub
parent f38676d161
commit 4fd222ad70
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 331 additions and 137 deletions

View file

@ -89,6 +89,14 @@ class TestMappingProviderExtra(TestMappingProvider):
return {"phone": userinfo["phone"]}
class TestMappingProviderFailures(TestMappingProvider):
async def map_user_attributes(self, userinfo, token, failures):
return {
"localpart": userinfo["username"] + (str(failures) if failures else ""),
"display_name": None,
}
def simple_async_mock(return_value=None, raises=None):
# AsyncMock is not available in python3.5, this mimics part of its behaviour
async def cb(*args, **kwargs):
@ -152,6 +160,9 @@ class OidcHandlerTestCase(HomeserverTestCase):
self.render_error = Mock(return_value=None)
self.handler._sso_handler.render_error = self.render_error
# Reduce the number of attempts when generating MXIDs.
self.handler._sso_handler._MAP_USERNAME_RETRIES = 3
return hs
def metadata_edit(self, values):
@ -693,7 +704,10 @@ class OidcHandlerTestCase(HomeserverTestCase):
),
MappingException,
)
self.assertEqual(str(e.value), "mxid '@test_user_3:test' is already taken")
self.assertEqual(
str(e.value),
"Could not extract user attributes from SSO response: Mapping provider does not support de-duplicating Matrix IDs",
)
@override_config({"oidc_config": {"allow_existing_users": True}})
def test_map_userinfo_to_existing_user(self):
@ -703,6 +717,8 @@ class OidcHandlerTestCase(HomeserverTestCase):
self.get_success(
store.register_user(user_id=user.to_string(), password_hash=None)
)
# Map a user via SSO.
userinfo = {
"sub": "test",
"username": "test_user",
@ -715,6 +731,23 @@ class OidcHandlerTestCase(HomeserverTestCase):
)
self.assertEqual(mxid, "@test_user:test")
# Note that a second SSO user can be mapped to the same Matrix ID. (This
# requires a unique sub, but something that maps to the same matrix ID,
# in this case we'll just use the same username. A more realistic example
# would be subs which are email addresses, and mapping from the localpart
# of the email, e.g. bob@foo.com and bob@bar.com -> @bob:test.)
userinfo = {
"sub": "test1",
"username": "test_user",
}
token = {}
mxid = self.get_success(
self.handler._map_userinfo_to_user(
userinfo, token, "user-agent", "10.10.10.10"
)
)
self.assertEqual(mxid, "@test_user:test")
# Register some non-exact matching cases.
user2 = UserID.from_string("@TEST_user_2:test")
self.get_success(
@ -762,6 +795,7 @@ class OidcHandlerTestCase(HomeserverTestCase):
"username": "föö",
}
token = {}
e = self.get_failure(
self.handler._map_userinfo_to_user(
userinfo, token, "user-agent", "10.10.10.10"
@ -769,3 +803,55 @@ class OidcHandlerTestCase(HomeserverTestCase):
MappingException,
)
self.assertEqual(str(e.value), "localpart is invalid: föö")
@override_config(
{
"oidc_config": {
"user_mapping_provider": {
"module": __name__ + ".TestMappingProviderFailures"
}
}
}
)
def test_map_userinfo_to_user_retries(self):
"""The mapping provider can retry generating an MXID if the MXID is already in use."""
store = self.hs.get_datastore()
self.get_success(
store.register_user(user_id="@test_user:test", password_hash=None)
)
userinfo = {
"sub": "test",
"username": "test_user",
}
token = {}
mxid = self.get_success(
self.handler._map_userinfo_to_user(
userinfo, token, "user-agent", "10.10.10.10"
)
)
# test_user is already taken, so test_user1 gets registered instead.
self.assertEqual(mxid, "@test_user1:test")
# Register all of the potential users for a particular username.
self.get_success(
store.register_user(user_id="@tester:test", password_hash=None)
)
for i in range(1, 3):
self.get_success(
store.register_user(user_id="@tester%d:test" % i, password_hash=None)
)
# Now attempt to map to a username, this will fail since all potential usernames are taken.
userinfo = {
"sub": "tester",
"username": "tester",
}
e = self.get_failure(
self.handler._map_userinfo_to_user(
userinfo, token, "user-agent", "10.10.10.10"
),
MappingException,
)
self.assertEqual(
str(e.value), "Unable to generate a Matrix ID from the SSO response"
)