forked from quay/quay
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
962f778
commit d95e681
Showing
2 changed files
with
197 additions
and
11 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,14 +1,14 @@ | ||
from test.analytics import analytics | ||
from test.fixtures import * | ||
from test.test_ldap import mock_ldap | ||
|
||
import pytest | ||
from mock import patch | ||
|
||
from data import database, model | ||
from data.users import DatabaseUsers, get_users_handler | ||
from data.users import DatabaseUsers, UserAuthentication, get_users_handler | ||
from endpoints.oauth.login import _conduct_oauth_login | ||
from oauth.oidc import OIDCLoginService | ||
from oauth.services.github import GithubOAuthService | ||
from test.analytics import analytics | ||
from test.fixtures import * | ||
from test.test_ldap import mock_ldap | ||
|
||
|
||
@pytest.fixture(params=[None, "username", "email"]) | ||
|
@@ -20,18 +20,45 @@ def login_service(request, app): | |
return GithubOAuthService(config, "GITHUB") | ||
|
||
|
||
@pytest.fixture(params=["Database", "LDAP"]) | ||
@pytest.fixture() | ||
def oidc_login_service(app): | ||
config = { | ||
"REGISTRY_TITLE_SHORT": "quay-test", | ||
"TESTING": True, | ||
"OIDC_LOGIN_CONFIG": { | ||
"CLIENT_ID": "foo", | ||
"CLIENT_SECRET": "bar", | ||
"SERVICE_NAME": "Test Service", | ||
"OIDC_SERVER": "http://server-hostname/realms/server-realm/", | ||
"DEBUGGING": True, | ||
}, | ||
} | ||
|
||
return OIDCLoginService(config, "OIDC_LOGIN_CONFIG") | ||
|
||
|
||
@pytest.fixture(params=["Database", "LDAP", "OIDC"]) | ||
def auth_system(request): | ||
return _get_users_handler(request.param) | ||
|
||
|
||
def _get_users_handler(auth_type): | ||
config = {} | ||
config["AUTHENTICATION_TYPE"] = auth_type | ||
config["LDAP_BASE_DN"] = ["dc=quay", "dc=io"] | ||
config["LDAP_ADMIN_DN"] = "uid=testy,ou=employees,dc=quay,dc=io" | ||
config["LDAP_ADMIN_PASSWD"] = "password" | ||
config["LDAP_USER_RDN"] = ["ou=employees"] | ||
if auth_type == "LDAP": | ||
config["AUTHENTICATION_TYPE"] = auth_type | ||
config["LDAP_BASE_DN"] = ["dc=quay", "dc=io"] | ||
config["LDAP_ADMIN_DN"] = "uid=testy,ou=employees,dc=quay,dc=io" | ||
config["LDAP_ADMIN_PASSWD"] = "password" | ||
config["LDAP_USER_RDN"] = ["ou=employees"] | ||
|
||
if auth_type == "OIDC": | ||
config["AUTHENTICATION_TYPE"] = auth_type | ||
config["CLIENT_ID"] = "foo" | ||
config["CLIENT_SECRET"] = "bar" | ||
config["OIDC_SERVER"] = "http://server-hostname/realms/server-realm/" | ||
config["SERVICE_NAME"] = "Test Service" | ||
config["LOGIN_SCOPES"] = ["openid", "roles"] | ||
config["PREFERRED_GROUP_CLAIM_NAME"] = "groups" | ||
|
||
return get_users_handler(config, None, None) | ||
|
||
|
@@ -268,3 +295,156 @@ def test_existing_account_in_ldap(app): | |
result.user_obj, internal_auth.federated_service | ||
) | ||
assert internal_login is not None | ||
|
||
|
||
@pytest.mark.parametrize( | ||
"binding_field, lid, lusername, lemail, additional_login_info, expected_error", | ||
[ | ||
# No binding field + newly seen user -> New unlinked user | ||
(None, "someid", "someunknownuser", "[email protected]", None, None), | ||
# No binding field + newly seen user with additional login info -> New unlinked user | ||
( | ||
None, | ||
"someid", | ||
"someunknownuser", | ||
"[email protected]", | ||
{"group": "information", "is": "here"}, | ||
None, | ||
), | ||
# sub binding field + unknown sub -> Error. | ||
( | ||
"sub", | ||
"someid", | ||
"someuser", | ||
"[email protected]", | ||
None, | ||
"sub someid not found in backing auth system", | ||
), | ||
# username binding field + unknown username -> Error. | ||
( | ||
"username", | ||
"someid", | ||
"someunknownuser", | ||
"[email protected]", | ||
None, | ||
"username someunknownuser not found in backing auth system", | ||
), | ||
# email binding field + unknown email address -> Error. | ||
( | ||
"email", | ||
"someid", | ||
"someuser", | ||
"[email protected]", | ||
None, | ||
"email [email protected] not found in backing auth system", | ||
), | ||
# No binding field + newly seen user -> New unlinked user. | ||
(None, "someid", "someuser", "[email protected]", None, None), | ||
# username binding field + valid username -> fully bound user. | ||
( | ||
"username", | ||
"someid", | ||
"someuser", | ||
"[email protected]", | ||
None, | ||
"username someuser not found in backing auth system", | ||
), | ||
# sub binding field + valid sub -> fully bound user. | ||
( | ||
"sub", | ||
"someuser", | ||
"someusername", | ||
"[email protected]", | ||
None, | ||
"sub someuser not found in backing auth system", | ||
), | ||
# email binding field + valid email -> fully bound user. | ||
( | ||
"email", | ||
"someid", | ||
"someuser", | ||
"[email protected]", | ||
None, | ||
"email [email protected] not found in backing auth system", | ||
), | ||
], | ||
) | ||
def test_new_account_via_oidc( | ||
binding_field, lid, lusername, lemail, additional_login_info, expected_error, app | ||
): | ||
existing_user_count = database.User.select().count() | ||
|
||
config = {"GITHUB": {}} | ||
if binding_field is not None: | ||
config["GITHUB"]["LOGIN_BINDING_FIELD"] = binding_field | ||
|
||
external_auth = GithubOAuthService(config, "GITHUB") | ||
internal_auth = _get_users_handler("OIDC") | ||
|
||
result = _conduct_oauth_login( | ||
app.config, | ||
analytics, | ||
internal_auth, | ||
external_auth, | ||
lid, | ||
lusername, | ||
lemail, | ||
additional_login_info, | ||
) | ||
assert result.error_message == expected_error | ||
|
||
current_user_count = database.User.select().count() | ||
if expected_error is None: | ||
assert current_user_count == existing_user_count + 1 | ||
assert result.user_obj is not None | ||
|
||
# Check the service bindings. | ||
external_login = model.user.lookup_federated_login( | ||
result.user_obj, external_auth.service_id() | ||
) | ||
assert external_login is not None | ||
|
||
internal_login = model.user.lookup_federated_login( | ||
result.user_obj, internal_auth.federated_service | ||
) | ||
if binding_field is not None: | ||
assert internal_login is not None | ||
else: | ||
assert internal_login is None | ||
else: | ||
# Ensure that no additional users were created. | ||
assert current_user_count == existing_user_count | ||
|
||
|
||
def test_existing_account_in_oidc(app, oidc_login_service): | ||
# Add an existing federated user bound to the OIDC account associated with `someuser`. | ||
bound_user = model.user.create_federated_user( | ||
"someuser", "[email protected]", oidc_login_service.service_id(), "someuser", False | ||
) | ||
existing_user_count = database.User.select().count() | ||
|
||
# Conduct OAuth login with the same lid and bound field. | ||
result = _conduct_oauth_login( | ||
app.config, | ||
analytics, | ||
UserAuthentication, | ||
oidc_login_service, | ||
"someuser", | ||
bound_user.username, | ||
bound_user.email, | ||
) | ||
assert result.error_message is None | ||
|
||
# Ensure that the same user was returned, and that it is now bound to the Github account | ||
# as well. | ||
assert result.user_obj.id == bound_user.id | ||
|
||
# Ensure that no additional users were created. | ||
current_user_count = database.User.select().count() | ||
assert current_user_count == existing_user_count | ||
|
||
# Check the service bindings. | ||
external_login = model.user.lookup_federated_login( | ||
result.user_obj, oidc_login_service.service_id() | ||
) | ||
assert external_login is not None |