Skip to content

Commit

Permalink
vdk-control-api-auth: add get_authenticated_username
Browse files Browse the repository at this point in the history
Add to Authentication library a method to get_authenticated_username.

The purpose of this method is ot be used during debugging , in logging
and in telemetry use cases.

Testing Done: unit tests
  • Loading branch information
antoniivanov committed Aug 3, 2023
1 parent dbfc8e5 commit 4dd9db7
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 3 deletions.
2 changes: 0 additions & 2 deletions projects/vdk-plugins/vdk-control-api-auth/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
pytest
pytest-httpserver
requests
requests_oauthlib
vdk-test-utils
2 changes: 1 addition & 1 deletion projects/vdk-plugins/vdk-control-api-auth/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
"authentication.",
long_description=pathlib.Path("README.md").read_text(),
long_description_content_type="text/markdown",
install_requires=["requests", "requests_oauthlib"],
install_requires=["requests", "requests_oauthlib", "PyJWT"],
package_dir={"": "src"},
packages=setuptools.find_namespace_packages(where="src"),
classifiers=[
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import logging
from typing import Optional

from vdk.plugin.control_api_auth.auth_config import InMemAuthConfiguration
from vdk.plugin.control_api_auth.auth_exception import VDKInvalidAuthParamError
from vdk.plugin.control_api_auth.autorization_code_auth import RedirectAuthentication
from vdk.plugin.control_api_auth.base_auth import BaseAuth
from vdk.plugin.control_api_auth.login_types import LoginTypes

log = logging.getLogger(__name__)


class Authentication:
"""Main class used for authentication."""
Expand Down Expand Up @@ -90,6 +95,39 @@ def authenticate(self) -> None:
countermeasure="Provide a valid auth_type.",
)

def get_authenticated_username(self) -> Optional[str]:
"""
Extract user name about currently authenticated user.
It tries to get username or if not available user email or if that fails some user id.
The reponse format should not be relied upon, it's meant to be used for logging and telemetry.
"""
if self._username:
return self._username

if not self._token:
self._token = self.read_access_token()
if not self._token:
return None

try:
import jwt

jwt_payload = jwt.decode(self._token, options={"verify_signature": False})
if not jwt_payload:
return None

possible_user_keys = ["username", "acct", "preferred_username", "email"]
for key in possible_user_keys:
user_id = jwt_payload.get(key)
if user_id:
return user_id

except Exception as e:
log.debug(
f"Could not to extract user information from the token due to {str(e)}"
)
return None

def read_access_token(self):
"""Read access token from cache."""
return self._auth.read_access_token()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
from unittest.mock import patch

import pytest
from vdk.plugin.control_api_auth.authentication import Authentication


def test_get_authenticated_username_with_username():
auth = Authentication(username="testuser")
assert auth.get_authenticated_username() == "testuser"


@patch("jwt.decode")
def test_get_authenticated_username_with_token_username(mock_jwt_decode):
mock_jwt_decode.return_value = {"username": "testuser"}
auth = Authentication(token="testtoken")
assert auth.get_authenticated_username() == "testuser"


@pytest.mark.parametrize("user_id_field", ["acct", "email", "preferred_username"])
@patch("jwt.decode")
def test_get_authenticated_username_with_token_other_fields(
mock_jwt_decode, user_id_field
):
mock_jwt_decode.return_value = {user_id_field: "testuser"}
auth = Authentication(token="testtoken")
assert auth.get_authenticated_username() == "testuser"


@patch("jwt.decode")
def test_get_authenticated_username_with_token_no_user_id(mock_jwt_decode):
mock_jwt_decode.return_value = {}
auth = Authentication(token="testtoken")
assert auth.get_authenticated_username() is None


def test_get_authenticated_username_without_username_or_token():
auth = Authentication()
assert auth.get_authenticated_username() is None

0 comments on commit 4dd9db7

Please sign in to comment.