diff --git a/projects/vdk-control-cli/requirements.txt b/projects/vdk-control-cli/requirements.txt
index c9fbd3293a..27239aae8f 100644
--- a/projects/vdk-control-cli/requirements.txt
+++ b/projects/vdk-control-cli/requirements.txt
@@ -24,5 +24,6 @@ tabulate
# and use requests_oauthlib only, but for now it seems like there are some custom logic with the first
# OAuth2 provider which needs to be handled that way
urllib3>=1.26.5
+vdk-control-api-auth
vdk-control-service-api==1.0.6
diff --git a/projects/vdk-control-cli/setup.cfg b/projects/vdk-control-cli/setup.cfg
index 294f12dc31..42bf65b501 100644
--- a/projects/vdk-control-cli/setup.cfg
+++ b/projects/vdk-control-cli/setup.cfg
@@ -54,6 +54,7 @@ install_requires =
tabulate
requests_oauthlib>=1.0
urllib3>=1.26.5
+ vdk-control-api-auth
# Require a specific Python version
python_requires = >=3.7, <3.11
diff --git a/projects/vdk-control-cli/src/vdk/internal/control/auth/apikey_auth.py b/projects/vdk-control-cli/src/vdk/internal/control/auth/apikey_auth.py
deleted file mode 100644
index 60d5dc80fb..0000000000
--- a/projects/vdk-control-cli/src/vdk/internal/control/auth/apikey_auth.py
+++ /dev/null
@@ -1,39 +0,0 @@
-# Copyright 2021 VMware, Inc.
-# SPDX-License-Identifier: Apache-2.0
-from typing import Optional
-
-from vdk.internal.control.auth.auth import Authentication
-from vdk.internal.control.auth.login_types import LoginTypes
-from vdk.internal.control.configuration.vdk_config import VDKConfig
-
-
-class ApiKeyAuthentication:
- """
- Class that execute authentication process using API token.
- It will use the API token to get temporary access token using api token authorization URL.
- See Authentication class as well.
- """
-
- def __init__(
- self,
- api_token_authorization_url: Optional[str] = None,
- api_token: Optional[str] = None,
- ):
- """
- :param api_token_authorization_url: Authorization URL - Same as login --api-token-authorization-server-url.
- :param api_token: API Token - Same as login --api-token.
- """
- self.__api_token = api_token
- self.__api_token_authorization_url = api_token_authorization_url
- self.__auth = Authentication()
-
- def authentication_process(self) -> None:
- """
- Executes the authentication process and caches the generated access token so it can be used during REST calls.
- """
- self.__auth.update_api_token_authorization_url(
- self.__api_token_authorization_url
- )
- self.__auth.update_api_token(self.__api_token)
- self.__auth.update_auth_type(LoginTypes.API_TOKEN.value)
- self.__auth.acquire_and_cache_access_token()
diff --git a/projects/vdk-control-cli/src/vdk/internal/control/auth/auth_pkce.py b/projects/vdk-control-cli/src/vdk/internal/control/auth/auth_pkce.py
deleted file mode 100644
index c048e69740..0000000000
--- a/projects/vdk-control-cli/src/vdk/internal/control/auth/auth_pkce.py
+++ /dev/null
@@ -1,22 +0,0 @@
-# Copyright 2021 VMware, Inc.
-# SPDX-License-Identifier: Apache-2.0
-import base64
-import hashlib
-import os
-import re
-
-
-class AuthPkce:
- @staticmethod
- def generate_pkce_codes():
- """
- Generate PKCE code challenge and code verifier necessary during Authorization Code Workflow
- as described in RFC 7636 (see https://tools.ietf.org/html/rfc7636)
- :return: code_verifier, code_challenge, code_challenge_method
- """
- code_verifier = base64.urlsafe_b64encode(os.urandom(40)).decode("utf-8")
- code_verifier = re.sub("[^a-zA-Z0-9]+", "", code_verifier)
- code_challenge = hashlib.sha256(code_verifier.encode("utf-8")).digest()
- code_challenge = base64.urlsafe_b64encode(code_challenge).decode("utf-8")
- code_challenge = code_challenge.replace("=", "")
- return code_verifier, code_challenge, "S256"
diff --git a/projects/vdk-control-cli/src/vdk/internal/control/auth/redirect_auth.py b/projects/vdk-control-cli/src/vdk/internal/control/auth/redirect_auth.py
deleted file mode 100644
index 357c14f4e4..0000000000
--- a/projects/vdk-control-cli/src/vdk/internal/control/auth/redirect_auth.py
+++ /dev/null
@@ -1,310 +0,0 @@
-# Copyright 2021 VMware, Inc.
-# SPDX-License-Identifier: Apache-2.0
-import http
-import json
-import logging
-import os
-import socket
-import time
-import webbrowser
-from contextlib import closing
-from functools import partial
-from http.server import BaseHTTPRequestHandler
-from http.server import HTTPServer
-from urllib.parse import parse_qs
-from urllib.parse import urlparse
-
-import click
-from requests import HTTPError
-from requests import post
-from requests.auth import HTTPBasicAuth
-from requests_oauthlib import OAuth2Session
-from vdk.internal.control.auth.auth import Authentication
-from vdk.internal.control.auth.auth_pkce import AuthPkce
-from vdk.internal.control.auth.auth_request_values import AuthRequestValues
-from vdk.internal.control.auth.login_types import LoginTypes
-from vdk.internal.control.exception.vdk_exception import VDKException
-
-log = logging.getLogger(__name__)
-
-
-class LoginHandler:
- CODE_PARAMETER_KEY = "code"
- STATE_PARAMETER_KEY = "state"
- REFRESH_TOKEN_KEY = "refresh_token" # nosec
- GRANT_TYPE = "authorization_code"
-
- def __init__(
- self, client_id, client_secret, exchange_endpoint, redirect_uri, code_verifier
- ):
- self.client_id = client_id
- self.client_secret = client_secret
- self.exchange_endpoint = exchange_endpoint
- self.redirect_uri = redirect_uri
- self.code_verifier = code_verifier
- self.login_exception = None
-
- def login_with_authorization_code(self, path):
- try:
- auth_code = self._acquire_auth_code(path)
- json_content = self._exchange_code_for_tokens(auth_code)
- auth = Authentication()
- auth.update_oauth2_authorization_url(self.exchange_endpoint)
- if self.REFRESH_TOKEN_KEY in json_content:
- auth.update_refresh_token(json_content[self.REFRESH_TOKEN_KEY])
- auth.update_access_token(
- json_content[AuthRequestValues.ACCESS_TOKEN_KEY.value]
- )
- auth.update_access_token_expiration_time(
- time.time()
- + int(json_content[AuthRequestValues.EXPIRATION_TIME_KEY.value])
- )
- auth.update_client_id(self.client_id)
- auth.update_client_secret(self.client_secret)
- auth.update_auth_type(LoginTypes.CREDENTIALS.value)
- except Exception as login_exception:
- self.login_exception = login_exception
- raise
-
- def _exchange_code_for_tokens(self, auth_code):
- headers = {
- AuthRequestValues.CONTENT_TYPE_HEADER.value: AuthRequestValues.CONTENT_TYPE_URLENCODED.value,
- }
- data = (
- f"code={auth_code}&"
- + f"grant_type={self.GRANT_TYPE}&"
- + f"redirect_uri={self.redirect_uri}"
- )
- if not self.client_secret:
- log.debug(
- "No client secret specified. We assume native app workflow with PKCE (RFC 7636)."
- )
- data = f"{data}&code_verifier={self.code_verifier}"
- basic_auth = HTTPBasicAuth(self.client_id, self.client_secret)
- try:
- response = post(
- self.exchange_endpoint, data=data, headers=headers, auth=basic_auth
- )
- response.raise_for_status()
- json_data = json.loads(response.text)
- except HTTPError as http_exception:
- raise VDKException(
- what="Failed to login.",
- why=f"HTTP error occurred during authorization workflow. "
- f"Error was: HTTP error {http_exception.response.status_code}: {http_exception.response.content}",
- consequence="Operations may not work unless previous login is still valid.",
- countermeasure="Try to login again.\n"
- " If problem persist, try to see the reason in the why section and instruction there.\n"
- " If that does not help, open ticket to the support team. "
- "Provide all logs you have and describe exact steps to reproduce the issue "
- "and commands executed.",
- )
- except Exception as e:
- raise VDKException(
- what=f"Failed to login: {str(e)}.",
- why=f"Problem in the configuration or service. Cannot acquire tokens.",
- consequence="Cannot login user.",
- countermeasure="Contact the owner to resolve the problem.",
- )
- return json_data
-
- def _acquire_auth_code(self, path):
- url = urlparse(path)
- query_components = parse_qs(url.query)
- auth_code = ""
- state = ""
- if self.CODE_PARAMETER_KEY in query_components:
- auth_code = query_components[self.CODE_PARAMETER_KEY][0]
- if self.STATE_PARAMETER_KEY in query_components:
- state = query_components[self.STATE_PARAMETER_KEY][0]
- if state != AuthRequestValues.STATE_PARAMETER_VALUE.value or not state:
- raise VDKException(
- what=f"Failed to login.",
- why=f"Possibly the request was intercepted.",
- consequence="Cannot login user.",
- countermeasure="Try to login again.",
- )
- if not auth_code:
- raise VDKException(
- what=f"Authentication code is empty",
- why=f"The user failed to authenticate properly.",
- consequence="User will not be logged in.",
- countermeasure="Try to login again.",
- )
- return auth_code
-
-
-class MyHttpRequestHandler(BaseHTTPRequestHandler):
- """
- Class used by RedirectAuthentication to handle the GET redirect request in order to acquire the refresh and access
- tokens. In essence the class creates Authentication object which fills the necessary configuration fields for the
- credentials authentication type.
- """
-
- UTF_ENCODING = "utf8"
- CONTENT_TYPE_TEXT_HTML = "text/html"
- HTML_LOGIN_SUCCESS_TEMPLATE = (
- "
Login Successful!
"
- )
- HTML_LOGIN_FAILURE_TEMPLATE = "Login Failed. Check terminal for more information.
"
-
- def __init__(self, login_handler, *args, **kwargs):
- self.login_handler = login_handler
- super().__init__(*args, **kwargs)
-
- def do_GET(self):
- self.send_response(http.HTTPStatus.OK)
- self.send_header(
- AuthRequestValues.CONTENT_TYPE_HEADER.value, self.CONTENT_TYPE_TEXT_HTML
- )
- self.end_headers()
- try:
- self.login_handler.login_with_authorization_code(self.path)
- self.wfile.write(bytes(self.HTML_LOGIN_SUCCESS_TEMPLATE, self.UTF_ENCODING))
- except:
- self.wfile.write(bytes(self.HTML_LOGIN_FAILURE_TEMPLATE, self.UTF_ENCODING))
- raise
-
-
-class RedirectAuthentication:
- """
- Class used to start web browser and http process which will handle single GET request and shut down.
- The browser will redirect to the http process which handles the request.
-
- Making the port which we use for authentication constant enable integration with some OAuth2 authorization server
- (e.g VMware Cloud) which require a redirect uri when creating OAuth2 app which needs to be set to the exact
- value which is used in the redirect url. Before we took random port from the ones available which prevented us
- from configuring the URI in the application options.
- It might be good idea for this to be configurable so we can switch to random ports when necessary
- """
-
- def __init__(
- self,
- client_id,
- client_secret,
- oauth2_discovery_url,
- oauth2_exchange_url,
- redirect_uri="http://127.0.0.1",
- redirect_uri_default_port=31113,
- ):
- """
-
- :param client_id:
- The client identifier of the OAuth2 Application. Find out more https://tools.ietf.org/html/rfc6749#section-2.3.1
- :param client_secret:
- The client secret of the OAuth2 Application. Find out more https://tools.ietf.org/html/rfc6749#section-2.3.1
- :param oauth2_discovery_url:
- Token or Authorization URI used to exchange grant for access token
- :param oauth2_exchange_url:
- The authorization endpoint for which
- :param redirect_uri:
- The redirect uri which will be used in Authorization Workflow.
- Per https://tools.ietf.org/html/rfc8252#section-7.3 it should be http://127.0.0.1
- so there should not be reason to override the default except for tests
- :param redirect_uri_default_port:
- The default port to use for redirect uri unless env. variable 'OAUTH2_REDIRECT_URI_PORT' is used.
- If None - then random one is assigned
- """
- self.client_id = client_id
- self.client_secret = client_secret
- self.oauth2_discovery_url = oauth2_discovery_url
- self.oauth2_exchange_url = oauth2_exchange_url
-
- env_port = os.getenv("OAUTH2_REDIRECT_URI_PORT", default=None)
- if env_port:
- self.port = int(env_port)
- elif redirect_uri_default_port is not None:
- self.port = redirect_uri_default_port
- else:
- self.port = self.find_free_port()
- self.redirect_uri = f"{redirect_uri}:{self.port}"
-
- (
- self.code_verifier,
- self.code_challenge,
- self.code_challenge_method,
- ) = AuthPkce.generate_pkce_codes()
-
- def authentication_process(self):
- authorization_url = self._create_authorization_redirect_url()
- discovery_endpoint = authorization_url[0]
- login_handler = self._create_login_handler()
- handler = self._create_redirect_handler(login_handler)
- self._redirect(discovery_endpoint, handler, login_handler)
-
- def _redirect(self, discovery_endpoint: str, handler, login_handler: LoginHandler):
- with HTTPServer(("", self.port), handler) as server:
- click.echo(f"Opening browser at:\n{discovery_endpoint}")
- is_open = webbrowser.open(discovery_endpoint)
- if not is_open:
- click.echo(
- "We failed to open the browser automatically and will proceed to login manually.\n"
- "Please, follow below instructions:"
- )
- self._manual_login(discovery_endpoint, login_handler)
- else: # TODO: that's not very good UX, let's timeout after 1 minute
- click.echo(
- f"Press [Ctrl + C]/[Command + C] to quit in case of error in the browser."
- )
- server.handle_request()
- if login_handler.login_exception:
- raise login_handler.login_exception
-
- @staticmethod
- def _create_redirect_handler(login_handler: LoginHandler):
- return partial(MyHttpRequestHandler, login_handler)
-
- def _create_login_handler(self) -> LoginHandler:
- return LoginHandler(
- self.client_id,
- self.client_secret,
- self.oauth2_exchange_url,
- self.redirect_uri,
- self.code_verifier,
- )
-
- def _create_authorization_redirect_url(self):
- oauth = OAuth2Session(client_id=self.client_id, redirect_uri=self.redirect_uri)
- if not self.client_secret:
- log.debug(
- "No client secret specified. We assume native app workflow with PKCE (RFC 7636)."
- )
- return oauth.authorization_url(
- self.oauth2_discovery_url,
- state=AuthRequestValues.STATE_PARAMETER_VALUE.value,
- prompt=AuthRequestValues.LOGIN_PROMPT.value,
- code_challenge=self.code_challenge,
- code_challenge_method=self.code_challenge_method,
- )
- else:
- return oauth.authorization_url(
- self.oauth2_discovery_url,
- state=AuthRequestValues.STATE_PARAMETER_VALUE.value,
- prompt=AuthRequestValues.LOGIN_PROMPT.value,
- )
-
- @staticmethod
- def find_free_port():
- with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
- s.bind(("", 0))
- s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- return s.getsockname()[1]
-
- def _manual_login(self, discovery_endpoint, handler: LoginHandler):
- # manual login is necessary for environment where there is not a browser -
- # console only OS environment, WSL (Windows subsystem for linux)
- click.echo(
- f"Copy paste the following link in your browser:\n\n{discovery_endpoint}\n\n"
- )
- click.echo(
- f"Login using your company credentials and wait all redirects to finish."
- )
- click.echo(
- f"The last redirect will be to a page that starts with {self.redirect_uri} - "
- f"the page may show an error that site cannot be reached which you can ignore.\n"
- )
- click.echo(
- f"Please, copy the address that the browser was redirected to (it should start with {self.redirect_uri}) and paste it here:"
- )
- url = click.prompt("Copy-pasted URL")
- handler.login_with_authorization_code(url)
diff --git a/projects/vdk-control-cli/src/vdk/internal/control/command_groups/login_group/login.py b/projects/vdk-control-cli/src/vdk/internal/control/command_groups/login_group/login.py
index abcc75a4dc..36c38f3119 100644
--- a/projects/vdk-control-cli/src/vdk/internal/control/command_groups/login_group/login.py
+++ b/projects/vdk-control-cli/src/vdk/internal/control/command_groups/login_group/login.py
@@ -1,14 +1,12 @@
# Copyright 2021 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import click
-from vdk.internal.control.auth.apikey_auth import ApiKeyAuthentication
-from vdk.internal.control.auth.auth import Authentication
from vdk.internal.control.auth.login_types import LoginTypes
-from vdk.internal.control.auth.redirect_auth import RedirectAuthentication
-from vdk.internal.control.configuration.vdk_config import VDKConfig
from vdk.internal.control.exception.vdk_exception import VDKException
from vdk.internal.control.utils import cli_utils
from vdk.internal.control.utils.cli_utils import extended_option
+from vdk.plugin.control_api_auth.auth_exception import VDKAuthException
+from vdk.plugin.control_api_auth.authentication import Authentication
@click.command(
@@ -112,17 +110,28 @@ def login(
oauth2_exchange_url,
):
if auth_type == LoginTypes.CREDENTIALS.value:
- if not client_id or not oauth2_discovery_url or not oauth2_exchange_url:
- click.echo(
- f"Login type: {auth_type} requires arguments:, --client-secret, --client-id, "
- f"--oauth2-exchange-url, --oauth2-discovery-url"
+ try:
+ auth = Authentication(
+ client_id=client_id,
+ client_secret=client_secret,
+ auth_discovery_url=oauth2_discovery_url,
+ authorization_url=oauth2_exchange_url,
+ auth_type=auth_type,
+ cache_locally=True,
)
- else:
- redirect_auth = RedirectAuthentication(
- client_id, client_secret, oauth2_discovery_url, oauth2_exchange_url
- )
- redirect_auth.authentication_process()
+ auth.authenticate()
click.echo("Login Successful")
+ except VDKAuthException as e:
+ if (
+ "requires client_id and auth_discovery_url to be specified" in e.message
+ or "auth_url was not specified." in e.message
+ ):
+ click.echo(
+ f"Login type: {auth_type} requires arguments:, --client-secret, --client-id, "
+ "--oauth2-exchange-url, --oauth2-discovery-url"
+ )
+ else:
+ click.echo(f"Login failed. Error was: {e.message}")
elif auth_type == LoginTypes.API_TOKEN.value:
api_token = cli_utils.get_or_prompt("Oauth2 API token", api_token)
if not api_token:
@@ -134,8 +143,16 @@ def login(
countermeasure="Please login providing correct API Token. ",
)
else:
- apikey_auth = ApiKeyAuthentication(api_token_authorization_url, api_token)
- apikey_auth.authentication_process()
+ apikey_auth = Authentication(
+ authorization_url=api_token_authorization_url,
+ token=api_token,
+ auth_type=auth_type,
+ cache_locally=True,
+ )
+ try:
+ apikey_auth.authenticate()
+ except VDKAuthException as e:
+ click.echo(f"Login failed. Error was: {e.message}")
click.echo("Login Successful")
else:
click.echo(f"Login type: {auth_type} not supported")
diff --git a/projects/vdk-control-cli/tests/vdk/internal/control/auth/test_redirect_auth.py b/projects/vdk-control-cli/tests/vdk/internal/control/auth/test_redirect_auth.py
deleted file mode 100644
index 6170baa4cb..0000000000
--- a/projects/vdk-control-cli/tests/vdk/internal/control/auth/test_redirect_auth.py
+++ /dev/null
@@ -1,23 +0,0 @@
-# Copyright 2021 VMware, Inc.
-# SPDX-License-Identifier: Apache-2.0
-from vdk.internal import test_utils
-from vdk.internal.control.auth.redirect_auth import RedirectAuthentication
-
-
-# TODO: extend unit tests
-def test_verify_redirect_url():
- test_utils.allow_oauthlib_insecure_transport()
- auth = RedirectAuthentication(
- "client-id",
- "client-secret",
- "http://discovery-url",
- "http://exchange-url",
- "http://127.0.0.1",
- 9999,
- )
- authorization_url = auth._create_authorization_redirect_url()
- assert (
- authorization_url[0]
- == "http://discovery-url?response_type=code&client_id=client-id&redirect_uri=http%3A%2F%2F127.0.0.1%3A9999&state=requested&prompt=login"
- )
- assert authorization_url[1] == "requested"
diff --git a/projects/vdk-control-cli/tests/vdk/internal/control/command_groups/login_group/test_login.py b/projects/vdk-control-cli/tests/vdk/internal/control/command_groups/login_group/test_login.py
index 83f13ddb42..ba8de94ea7 100644
--- a/projects/vdk-control-cli/tests/vdk/internal/control/command_groups/login_group/test_login.py
+++ b/projects/vdk-control-cli/tests/vdk/internal/control/command_groups/login_group/test_login.py
@@ -139,3 +139,72 @@ def exchange_response(req: Request):
)
test_utils.assert_click_status(result, 0)
+
+
+def test_login_credentials_exceptions(httpserver: PluginHTTPServer):
+ with patch("webbrowser.open") as mock_browser:
+ test_utils.allow_oauthlib_insecure_transport()
+
+ mock_browser.side_effect = _mock_browser_login
+ httpserver.expect_request("/exchange").respond_with_json(
+ test_auth.get_json_response_mock()
+ )
+
+ runner = CliRunner()
+
+ # Assert error message when no --oauth2-exchange-url provided.
+ result = runner.invoke(
+ login,
+ [
+ "-t",
+ "credentials",
+ "--oauth2-discovery-url",
+ httpserver.url_for("/discovery"),
+ "--client-id",
+ "client_id",
+ "--client-secret",
+ "client_secret",
+ ],
+ )
+ assert (
+ "requires arguments:, --client-secret, --client-id, --oauth2-exchange-url, --oauth2-discovery-url"
+ in result.output
+ )
+
+ # Assert error message when no --oauth2-discovery-url provided.
+ result = runner.invoke(
+ login,
+ [
+ "-t",
+ "credentials",
+ "--oauth2-exchange-url",
+ httpserver.url_for("/exchange"),
+ "--client-id",
+ "client_id",
+ "--client-secret",
+ "client_secret",
+ ],
+ )
+ assert (
+ "requires arguments:, --client-secret, --client-id, --oauth2-exchange-url, --oauth2-discovery-url"
+ in result.output
+ )
+
+ # Assert error message when no --client-id provided.
+ result = runner.invoke(
+ login,
+ [
+ "-t",
+ "credentials",
+ "--oauth2-discovery-url",
+ httpserver.url_for("/discovery"),
+ "--oauth2-exchange-url",
+ httpserver.url_for("/exchange"),
+ "--client-secret",
+ "client_secret",
+ ],
+ )
+ assert (
+ "requires arguments:, --client-secret, --client-id, --oauth2-exchange-url, --oauth2-discovery-url"
+ in result.output
+ )