Skip to content

Commit

Permalink
vdk-control-api-auth: add get_authenticated_username (#2518)
Browse files Browse the repository at this point in the history
Add to Authentication library a method to get_authenticated_username.

The purpose of this method is ot be used during debugging , in logging
and in telemetry use cases.

Testing Done: unit tests
  • Loading branch information
antoniivanov authored Aug 9, 2023
1 parent f363c7f commit 5e7c40e
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 3 deletions.
2 changes: 0 additions & 2 deletions projects/vdk-plugins/vdk-control-api-auth/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
pytest
pytest-httpserver
requests
requests_oauthlib
vdk-test-utils
2 changes: 1 addition & 1 deletion projects/vdk-plugins/vdk-control-api-auth/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
"authentication.",
long_description=pathlib.Path("README.md").read_text(),
long_description_content_type="text/markdown",
install_requires=["requests", "requests_oauthlib"],
install_requires=["requests", "requests_oauthlib", "PyJWT"],
package_dir={"": "src"},
packages=setuptools.find_namespace_packages(where="src"),
classifiers=[
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import logging
from typing import List
from typing import Optional

from vdk.plugin.control_api_auth.auth_config import InMemAuthConfiguration
from vdk.plugin.control_api_auth.auth_exception import VDKInvalidAuthParamError
from vdk.plugin.control_api_auth.autorization_code_auth import RedirectAuthentication
from vdk.plugin.control_api_auth.base_auth import BaseAuth
from vdk.plugin.control_api_auth.login_types import LoginTypes

log = logging.getLogger(__name__)


class Authentication:
"""Main class used for authentication."""
Expand All @@ -21,6 +27,7 @@ def __init__(
auth_discovery_url: str = None,
auth_type: str = None,
cache_locally: bool = False,
possible_jwt_user_keys=None,
):
"""
:param username: A user's username in case basic authentication is used.
Expand All @@ -45,6 +52,9 @@ def __init__(
:param cache_locally:
A flag, indicating if credentials should be cached locally (in a
file).
:param possible_jwt_user_keys:
Used by get_authenticated_username to try to discover correct username if OAuth2 and JWT token is used. It is a list of keys where the first existing key in a JTW token is returned.
Defaults to some common user keys.
"""
self._username = username
self._password = password
Expand All @@ -59,6 +69,15 @@ def __init__(
self._auth = BaseAuth()
else:
self._auth = BaseAuth(conf=InMemAuthConfiguration())
if possible_jwt_user_keys:
self._possible_jwt_user_keys = possible_jwt_user_keys
else: # sensible defaults
self._possible_jwt_user_keys = [
"username",
"acct",
"preferred_username",
"email",
]

def authenticate(self) -> None:
if not self._auth_type:
Expand Down Expand Up @@ -90,6 +109,38 @@ def authenticate(self) -> None:
countermeasure="Provide a valid auth_type.",
)

def get_authenticated_username(self) -> Optional[str]:
"""
Extract user name about currently authenticated user.
It tries to get username or if not available user email or if that fails some user id.
The reponse format should not be relied upon, it's meant to be used for logging and telemetry.
"""
if self._username:
return self._username

if not self._token:
self._token = self.read_access_token()
if not self._token:
return None

try:
import jwt

jwt_payload = jwt.decode(self._token, options={"verify_signature": False})
if not jwt_payload:
return None

for key in self._possible_jwt_user_keys:
user_id = jwt_payload.get(key)
if user_id:
return user_id

except Exception:
log.debug(
f"Could not to extract user information from the token.", exc_info=True
)
return None

def read_access_token(self):
"""Read access token from cache."""
return self._auth.read_access_token()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
from unittest.mock import patch

import pytest
from vdk.plugin.control_api_auth.authentication import Authentication


def test_get_authenticated_username_with_username():
auth = Authentication(username="testuser")
assert auth.get_authenticated_username() == "testuser"


@patch("jwt.decode")
def test_get_authenticated_username_with_token_username(mock_jwt_decode):
mock_jwt_decode.return_value = {"username": "testuser"}
auth = Authentication(token="testtoken")
assert auth.get_authenticated_username() == "testuser"


@pytest.mark.parametrize("user_id_field", ["acct", "email", "preferred_username"])
@patch("jwt.decode")
def test_get_authenticated_username_with_token_other_fields(
mock_jwt_decode, user_id_field
):
mock_jwt_decode.return_value = {user_id_field: "testuser"}
auth = Authentication(token="testtoken")
assert auth.get_authenticated_username() == "testuser"


@patch("jwt.decode")
def test_get_authenticated_username_with_token_no_user_id(mock_jwt_decode):
mock_jwt_decode.return_value = {}
auth = Authentication(token="testtoken")
assert auth.get_authenticated_username() is None


@patch("jwt.decode")
def test_get_authenticated_username_priority_order_kept(mock_jwt_decode):
mock_jwt_decode.return_value = {"sub": "user_from_sub", "acct": "user_from_acct"}
auth = Authentication(
token="testtoken", possible_jwt_user_keys=["user", "email", "sub", "acct"]
)
assert auth.get_authenticated_username() == "user_from_sub"


def test_get_authenticated_username_without_username_or_token():
auth = Authentication()
assert auth.get_authenticated_username() is None

0 comments on commit 5e7c40e

Please sign in to comment.