diff --git a/data/users/externaloidc.py b/data/users/externaloidc.py index 5cdac8a11a..6202d53d3b 100644 --- a/data/users/externaloidc.py +++ b/data/users/externaloidc.py @@ -39,6 +39,12 @@ def check_group_lookup_args(self, group_lookup_args, disable_pagination=False): """ return (True, None) + def get_user(self, username_or_email): + """ + No way to look up a username or email in OIDC so returning None + """ + return (None, "Currently user lookup is not supported with OIDC") + def query_users(self, query, limit): """ No way to query users so returning empty list diff --git a/endpoints/oauth/test/test_login.py b/endpoints/oauth/test/test_login.py index 0780caa787..41645311e9 100644 --- a/endpoints/oauth/test/test_login.py +++ b/endpoints/oauth/test/test_login.py @@ -1,14 +1,14 @@ -from test.analytics import analytics -from test.fixtures import * -from test.test_ldap import mock_ldap - import pytest from mock import patch from data import database, model -from data.users import DatabaseUsers, get_users_handler +from data.users import DatabaseUsers, UserAuthentication, get_users_handler from endpoints.oauth.login import _conduct_oauth_login +from oauth.oidc import OIDCLoginService from oauth.services.github import GithubOAuthService +from test.analytics import analytics +from test.fixtures import * +from test.test_ldap import mock_ldap @pytest.fixture(params=[None, "username", "email"]) @@ -20,18 +20,45 @@ def login_service(request, app): return GithubOAuthService(config, "GITHUB") -@pytest.fixture(params=["Database", "LDAP"]) +@pytest.fixture() +def oidc_login_service(app): + config = { + "REGISTRY_TITLE_SHORT": "quay-test", + "TESTING": True, + "OIDC_LOGIN_CONFIG": { + "CLIENT_ID": "foo", + "CLIENT_SECRET": "bar", + "SERVICE_NAME": "Test Service", + "OIDC_SERVER": "http://server-hostname/realms/server-realm/", + "DEBUGGING": True, + }, + } + + return OIDCLoginService(config, "OIDC_LOGIN_CONFIG") + + +@pytest.fixture(params=["Database", "LDAP", "OIDC"]) def auth_system(request): return _get_users_handler(request.param) def _get_users_handler(auth_type): config = {} - config["AUTHENTICATION_TYPE"] = auth_type - config["LDAP_BASE_DN"] = ["dc=quay", "dc=io"] - config["LDAP_ADMIN_DN"] = "uid=testy,ou=employees,dc=quay,dc=io" - config["LDAP_ADMIN_PASSWD"] = "password" - config["LDAP_USER_RDN"] = ["ou=employees"] + if auth_type == "LDAP": + config["AUTHENTICATION_TYPE"] = auth_type + config["LDAP_BASE_DN"] = ["dc=quay", "dc=io"] + config["LDAP_ADMIN_DN"] = "uid=testy,ou=employees,dc=quay,dc=io" + config["LDAP_ADMIN_PASSWD"] = "password" + config["LDAP_USER_RDN"] = ["ou=employees"] + + if auth_type == "OIDC": + config["AUTHENTICATION_TYPE"] = auth_type + config["CLIENT_ID"] = "foo" + config["CLIENT_SECRET"] = "bar" + config["OIDC_SERVER"] = "http://server-hostname/realms/server-realm/" + config["SERVICE_NAME"] = "Test Service" + config["LOGIN_SCOPES"] = ["openid", "roles"] + config["PREFERRED_GROUP_CLAIM_NAME"] = "groups" return get_users_handler(config, None, None) @@ -268,3 +295,156 @@ def test_existing_account_in_ldap(app): result.user_obj, internal_auth.federated_service ) assert internal_login is not None + + +@pytest.mark.parametrize( + "binding_field, lid, lusername, lemail, additional_login_info, expected_error", + [ + # No binding field + newly seen user -> New unlinked user + (None, "someid", "someunknownuser", "someemail@example.com", None, None), + # No binding field + newly seen user with additional login info -> New unlinked user + ( + None, + "someid", + "someunknownuser", + "someemail@example.com", + {"group": "information", "is": "here"}, + None, + ), + # sub binding field + unknown sub -> Error. + ( + "sub", + "someid", + "someuser", + "foo@bar.com", + None, + "sub someid not found in backing auth system", + ), + # username binding field + unknown username -> Error. + ( + "username", + "someid", + "someunknownuser", + "foo@bar.com", + None, + "username someunknownuser not found in backing auth system", + ), + # email binding field + unknown email address -> Error. + ( + "email", + "someid", + "someuser", + "someemail@example.com", + None, + "email someemail@example.com not found in backing auth system", + ), + # No binding field + newly seen user -> New unlinked user. + (None, "someid", "someuser", "foo@bar.com", None, None), + # username binding field + valid username -> fully bound user. + ( + "username", + "someid", + "someuser", + "foo@bar.com", + None, + "username someuser not found in backing auth system", + ), + # sub binding field + valid sub -> fully bound user. + ( + "sub", + "someuser", + "someusername", + "foo@bar.com", + None, + "sub someuser not found in backing auth system", + ), + # email binding field + valid email -> fully bound user. + ( + "email", + "someid", + "someuser", + "foo@bar.com", + None, + "email foo@bar.com not found in backing auth system", + ), + ], +) +def test_new_account_via_oidc( + binding_field, lid, lusername, lemail, additional_login_info, expected_error, app +): + existing_user_count = database.User.select().count() + + config = {"GITHUB": {}} + if binding_field is not None: + config["GITHUB"]["LOGIN_BINDING_FIELD"] = binding_field + + external_auth = GithubOAuthService(config, "GITHUB") + internal_auth = _get_users_handler("OIDC") + + result = _conduct_oauth_login( + app.config, + analytics, + internal_auth, + external_auth, + lid, + lusername, + lemail, + additional_login_info, + ) + assert result.error_message == expected_error + + current_user_count = database.User.select().count() + if expected_error is None: + assert current_user_count == existing_user_count + 1 + assert result.user_obj is not None + + # Check the service bindings. + external_login = model.user.lookup_federated_login( + result.user_obj, external_auth.service_id() + ) + assert external_login is not None + + internal_login = model.user.lookup_federated_login( + result.user_obj, internal_auth.federated_service + ) + if binding_field is not None: + assert internal_login is not None + else: + assert internal_login is None + else: + # Ensure that no additional users were created. + assert current_user_count == existing_user_count + + +def test_existing_account_in_oidc(app, oidc_login_service): + # Add an existing federated user bound to the OIDC account associated with `someuser`. + bound_user = model.user.create_federated_user( + "someuser", "foo@bar.com", oidc_login_service.service_id(), "someuser", False + ) + existing_user_count = database.User.select().count() + + # Conduct OAuth login with the same lid and bound field. + result = _conduct_oauth_login( + app.config, + analytics, + UserAuthentication, + oidc_login_service, + "someuser", + bound_user.username, + bound_user.email, + ) + assert result.error_message is None + + # Ensure that the same user was returned, and that it is now bound to the Github account + # as well. + assert result.user_obj.id == bound_user.id + + # Ensure that no additional users were created. + current_user_count = database.User.select().count() + assert current_user_count == existing_user_count + + # Check the service bindings. + external_login = model.user.lookup_federated_login( + result.user_obj, oidc_login_service.service_id() + ) + assert external_login is not None