Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vdk-control-api-auth: add get_authenticated_username #2518

Merged
merged 3 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions projects/vdk-plugins/vdk-control-api-auth/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
pytest
pytest-httpserver
requests
requests_oauthlib
vdk-test-utils
2 changes: 1 addition & 1 deletion projects/vdk-plugins/vdk-control-api-auth/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
"authentication.",
long_description=pathlib.Path("README.md").read_text(),
long_description_content_type="text/markdown",
install_requires=["requests", "requests_oauthlib"],
install_requires=["requests", "requests_oauthlib", "PyJWT"],
package_dir={"": "src"},
packages=setuptools.find_namespace_packages(where="src"),
classifiers=[
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import logging
from typing import List
from typing import Optional

from vdk.plugin.control_api_auth.auth_config import InMemAuthConfiguration
from vdk.plugin.control_api_auth.auth_exception import VDKInvalidAuthParamError
from vdk.plugin.control_api_auth.autorization_code_auth import RedirectAuthentication
from vdk.plugin.control_api_auth.base_auth import BaseAuth
from vdk.plugin.control_api_auth.login_types import LoginTypes

log = logging.getLogger(__name__)


class Authentication:
"""Main class used for authentication."""
Expand All @@ -21,6 +27,7 @@ def __init__(
auth_discovery_url: str = None,
auth_type: str = None,
cache_locally: bool = False,
possible_jwt_user_keys=None,
):
"""
:param username: A user's username in case basic authentication is used.
Expand All @@ -45,6 +52,9 @@ def __init__(
:param cache_locally:
A flag, indicating if credentials should be cached locally (in a
file).
:param possible_jwt_user_keys:
Used by get_authenticated_username to try to discover correct username if OAuth2 and JWT token is used. It is a list of keys where the first existing key in a JTW token is returned.
Defaults to some common user keys.
"""
self._username = username
self._password = password
Expand All @@ -59,6 +69,15 @@ def __init__(
self._auth = BaseAuth()
else:
self._auth = BaseAuth(conf=InMemAuthConfiguration())
if possible_jwt_user_keys:
self._possible_jwt_user_keys = possible_jwt_user_keys
else: # sensible defaults
self._possible_jwt_user_keys = [
"username",
"acct",
"preferred_username",
"email",
]

def authenticate(self) -> None:
if not self._auth_type:
Expand Down Expand Up @@ -90,6 +109,38 @@ def authenticate(self) -> None:
countermeasure="Provide a valid auth_type.",
)

def get_authenticated_username(self) -> Optional[str]:
"""
Extract user name about currently authenticated user.
It tries to get username or if not available user email or if that fails some user id.
The reponse format should not be relied upon, it's meant to be used for logging and telemetry.
"""
if self._username:
return self._username

if not self._token:
self._token = self.read_access_token()
if not self._token:
return None

try:
import jwt

jwt_payload = jwt.decode(self._token, options={"verify_signature": False})
if not jwt_payload:
return None

for key in self._possible_jwt_user_keys:
user_id = jwt_payload.get(key)
if user_id:
return user_id

except Exception:
log.debug(
f"Could not to extract user information from the token.", exc_info=True
)
return None

def read_access_token(self):
"""Read access token from cache."""
return self._auth.read_access_token()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
from unittest.mock import patch

import pytest
from vdk.plugin.control_api_auth.authentication import Authentication


def test_get_authenticated_username_with_username():
auth = Authentication(username="testuser")
assert auth.get_authenticated_username() == "testuser"


@patch("jwt.decode")
def test_get_authenticated_username_with_token_username(mock_jwt_decode):
mock_jwt_decode.return_value = {"username": "testuser"}
auth = Authentication(token="testtoken")
assert auth.get_authenticated_username() == "testuser"


@pytest.mark.parametrize("user_id_field", ["acct", "email", "preferred_username"])
@patch("jwt.decode")
def test_get_authenticated_username_with_token_other_fields(
mock_jwt_decode, user_id_field
):
mock_jwt_decode.return_value = {user_id_field: "testuser"}
auth = Authentication(token="testtoken")
assert auth.get_authenticated_username() == "testuser"


@patch("jwt.decode")
def test_get_authenticated_username_with_token_no_user_id(mock_jwt_decode):
mock_jwt_decode.return_value = {}
auth = Authentication(token="testtoken")
assert auth.get_authenticated_username() is None


@patch("jwt.decode")
def test_get_authenticated_username_priority_order_kept(mock_jwt_decode):
mock_jwt_decode.return_value = {"sub": "user_from_sub", "acct": "user_from_acct"}
auth = Authentication(
token="testtoken", possible_jwt_user_keys=["user", "email", "sub", "acct"]
)
assert auth.get_authenticated_username() == "user_from_sub"


def test_get_authenticated_username_without_username_or_token():
auth = Authentication()
assert auth.get_authenticated_username() is None