Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add use_env_var flag to client #923

Merged
merged 11 commits into from
Dec 22, 2022
11 changes: 6 additions & 5 deletions docs/samples/feature_embedding.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
"outputs": [],
"source": [
"import json\n",
"import os\n",
"\n",
"import pandas as pd\n",
"from pyspark.sql import DataFrame\n",
Expand Down Expand Up @@ -102,7 +103,7 @@
},
"outputs": [],
"source": [
"RESOURCE_PREFIX = None # TODO fill the value\n",
"RESOURCE_PREFIX = \"\" # TODO fill the value\n",
"PROJECT_NAME = \"hotel_reviews_embedding\"\n",
"\n",
"REGISTRY_ENDPOINT = f\"https://{RESOURCE_PREFIX}webapp.azurewebsites.net/api/v1\"\n",
Expand All @@ -114,8 +115,8 @@
" SPARK_CONFIG__DATABRICKS__WORKSPACE_INSTANCE_URL = f\"https://{ctx.tags().get('browserHostName').get()}\"\n",
"else:\n",
" # TODO fill the values.\n",
" DATABRICKS_WORKSPACE_TOKEN_VALUE = None\n",
" SPARK_CONFIG__DATABRICKS__WORKSPACE_INSTANCE_URL = None\n",
" DATABRICKS_WORKSPACE_TOKEN_VALUE = os.environ.get(\"DATABRICKS_WORKSPACE_TOKEN_VALUE\")\n",
" SPARK_CONFIG__DATABRICKS__WORKSPACE_INSTANCE_URL = os.environ.get(\"SPARK_CONFIG__DATABRICKS__WORKSPACE_INSTANCE_URL\")\n",
"\n",
"# We'll need an authentication credential to access Azure resources and register features \n",
"USE_CLI_AUTH = False # Set True to use interactive authentication\n",
Expand Down Expand Up @@ -146,7 +147,6 @@
" credential = AzureCliCredential(additionally_allowed_tenants=['*'],)\n",
"elif AZURE_TENANT_ID and AZURE_CLIENT_ID and AZURE_CLIENT_SECRET:\n",
" # Use Environment variable secret\n",
" import os\n",
" from azure.identity import EnvironmentCredential\n",
" os.environ[\"AZURE_TENANT_ID\"] = AZURE_TENANT_ID\n",
" os.environ[\"AZURE_CLIENT_ID\"] = AZURE_CLIENT_ID\n",
Expand Down Expand Up @@ -315,6 +315,7 @@
"client = FeathrClient(\n",
" config_path=config_path,\n",
" credential=credential,\n",
" use_env_vars=False,\n",
")"
]
},
Expand Down Expand Up @@ -791,7 +792,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.4"
"version": "3.10.8 (main, Nov 24 2022, 14:13:03) [GCC 11.2.0]"
},
"vscode": {
"interpreter": {
Expand Down
32 changes: 21 additions & 11 deletions feathr_project/feathr/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
import os
import tempfile
from typing import Dict, List, Union
from typing import Any, Dict, List, Union

from azure.identity import DefaultAzureCredential
from feathr.definition.transformation import WindowAggTransformation
Expand Down Expand Up @@ -53,21 +53,31 @@ class FeathrClient(object):
The users of this client is responsible for set up all the necessary information needed to start a Redis client via
environment variable or a Spark cluster. Host address, port and password are needed to start the Redis client.

Attributes:
config_path (str, optional): config path. See [Feathr Config Template](https://github.com/feathr-ai/feathr/blob/main/feathr_project/feathrcli/data/feathr_user_workspace/feathr_config.yaml) for more details. Defaults to "./feathr_config.yaml".
local_workspace_dir (str, optional): set where is the local work space dir. If not set, Feathr will create a temporary folder to store local workspace related files.
credential (optional): credential to access cloud resources, most likely to be the returned result of DefaultAzureCredential(). If not set, Feathr will initialize DefaultAzureCredential() inside the __init__ function to get credentials.
project_registry_tag (Dict[str, str]): adding tags for project in Feathr registry. This might be useful if you want to tag your project as deprecated, or allow certain customizations on project leve. Default is empty

Raises:
RuntimeError: Fail to create the client since necessary environment variables are not set for Redis
client creation.
client creation.
"""
def __init__(self, config_path:str = "./feathr_config.yaml", local_workspace_dir: str = None, credential=None, project_registry_tag: Dict[str, str]=None):
def __init__(
self,
config_path:str = "./feathr_config.yaml",
local_workspace_dir: str = None,
credential: Any = None,
project_registry_tag: Dict[str, str] = None,
use_env_vars: bool = True,
):
"""Initialize Feathr Client.

Args:
config_path (optional): Config yaml file path. See [Feathr Config Template](https://github.com/feathr-ai/feathr/blob/main/feathr_project/feathrcli/data/feathr_user_workspace/feathr_config.yaml) for more details. Defaults to "./feathr_config.yaml".
local_workspace_dir (optional): Set where is the local work space dir. If not set, Feathr will create a temporary folder to store local workspace related files.
credential (optional): Azure credential to access cloud resources, most likely to be the returned result of DefaultAzureCredential(). If not set, Feathr will initialize DefaultAzureCredential() inside the __init__ function to get credentials.
project_registry_tag (optional): Adding tags for project in Feathr registry. This might be useful if you want to tag your project as deprecated, or allow certain customizations on project leve. Default is empty
use_env_vars (optional): Whether to use environment variables to set up the client. If set to False, the client will not use environment variables to set up the client. Defaults to True.
"""
self.logger = logging.getLogger(__name__)
# Redis key separator
self._KEY_SEPARATOR = ':'
self.envutils = _EnvVaraibleUtil(config_path)
self.envutils = _EnvVaraibleUtil(config_path, use_env_vars)
if local_workspace_dir:
self.local_workspace_dir = local_workspace_dir
else:
Expand Down Expand Up @@ -543,7 +553,7 @@ def _get_offline_features_with_config(self,
- Job configuration are like "configurations" for the spark job and are usually spark specific. For example, we want to control the no. of write parts for spark
Job configurations and job arguments (or sometimes called job parameters) have quite some overlaps (i.e. you can achieve the same goal by either using the job arguments/parameters vs. job configurations). But the job tags should just be used for metadata purpose.
'''

# submit the jars
return self.feathr_spark_launcher.submit_feathr_job(
job_name=self.project_name + '_feathr_feature_join_job',
Expand Down
146 changes: 86 additions & 60 deletions feathr_project/feathr/utils/_envvariableutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,39 +5,101 @@
from azure.core.exceptions import ResourceNotFoundError

class _EnvVaraibleUtil(object):
Yuqing-cat marked this conversation as resolved.
Show resolved Hide resolved
def __init__(self, config_path):
"""A utility class to read config variables.
If use_env_vars set to False, `get_environment_variable_with_default` will not use os environment variables.
Yuqing-cat marked this conversation as resolved.
Show resolved Hide resolved
Note, `get_environment_variable` still uses os environment variables.
"""

def __init__(self, config_path: str, use_env_vars: bool = True):
"""Initialize the utility class.

Args:
config_path: Config file path.
use_env_vars (optional): Whether to use os environment variables instead of config file. Defaults to True.
"""
self.config_path = config_path
# Set to none first to avoid invalid reference
self.akv_name = None
self.akv_name = self.get_environment_variable_with_default( 'secrets', 'azure_key_vault', 'name')
self.use_env_vars = use_env_vars

self.akv_name = (
self._get_variable_from_env("secrets__azure_key_vault__name") or
loomlike marked this conversation as resolved.
Show resolved Hide resolved
self._get_variable_from_file("secrets", "azure_key_vault", "name")
)
self.akv_client = AzureKeyVaultClient(self.akv_name) if self.akv_name else None

def get_environment_variable_with_default(self, *args):
"""Gets the environment variable for the variable key.
def get_environment_variable_with_default(self, *args) -> str:
"""Gets the Feathr config variable for the given variable keys.

Args:
*args: list of keys in feathr_config.yaml file
Return:
A environment variable for the variable key. It will retrieve the value of the environment variables in the following order:
If the key is set in the environment variable, Feathr will use the value of that environment variable
If it's not set in the environment, then a default is retrieved from the feathr_config.yaml file with the same config key.
If it's not available in the feathr_config.yaml file, Feathr will try to retrieve the value from key vault
If not found, an empty string will be returned with a warning error message.
"""

# if envs exist, just return the existing env variable without reading the file
env_keyword = "__".join(args)
upper_env_keyword = env_keyword.upper()
*args: list of keys in `config_path` yaml file.
For example, to get `SPARK_CONFIG__DATABRICKS__WORKSPACE_INSTANCE_URL`,
you may call `get_environment_variable_with_default("SPARK_CONFIG", "DATABRICKS", "WORKSPACE_INSTANCE_URL")`

Returns:
Feathr client's config variable. It will retrieve the value in the following order:
- From the environment variable if `use_env_vars == True` and the key is set in the os environment variables.
- From the config yaml file.
- From the Azure Key Vault.
If the key is not found in any of the above, it will return None.
"""
variable_key = "__".join(args)

env_var = (
(self._get_variable_from_env(variable_key) if self.use_env_vars else None) or
self._get_variable_from_file(*args) or
(self._get_variable_from_akv(variable_key) if self.akv_name else None)
)

if env_var is None:
logger.warning(f"Environment variable {variable_key} doesn't exist in environment variable, YAML config file, and key vault service.")
loomlike marked this conversation as resolved.
Show resolved Hide resolved

return env_var

def get_environment_variable(self, variable_key: str) -> str:
"""Gets the Feathr config variable for the given variable keys.

Args:
variable_key: environment variable key that is used to retrieve the environment variable

Returns:
Feathr client's config variable. It will retrieve the value in the following order:
- From the environment variable if the key is set in the os environment variables.
- From the Azure Key Vault.
If the key is not found in any of the above, it will return None.
"""
env_var = (
self._get_variable_from_env(variable_key) or
(self._get_variable_from_akv(variable_key) if self.akv_name else None)
)

if env_var is None:
logger.warning(f"Environment variable {variable_key} doesn't exist in environment variable, YAML config file, and key vault service.")

return env_var

def _get_variable_from_env(self, variable_key: str) -> str:
# make it work for lower case and upper case.
env_variable = os.environ.get(
env_keyword, os.environ.get(upper_env_keyword))
env_variable = os.environ.get(variable_key, os.environ.get(variable_key.upper()))

# If the key is set in the environment variable, Feathr will use the value of that environment variable
# If it's not available in the environment variable file, Feathr will try to retrieve the value from key vault
if env_variable:
return env_variable
else:
logger.info(f"{variable_key} is not set in the environment variables.")

return None

def _get_variable_from_akv(self, variable_key: str) -> str:
try:
return self.akv_client.get_feathr_akv_secret(variable_key)
except ResourceNotFoundError:
logger.warning(f"Resource {self.akv_name} not found")

# If it's not set in the environment, then a default is retrieved from the feathr_config.yaml file with the same config key.
return None

def _get_variable_from_file(self, *args) -> str:
if os.path.exists(os.path.abspath(self.config_path)):
with open(os.path.abspath(self.config_path), 'r') as stream:
with open(os.path.abspath(self.config_path), "r") as stream:
try:
yaml_config = yaml.safe_load(stream)
# concat all layers and check in environment variable
Expand All @@ -48,44 +110,8 @@ def get_environment_variable_with_default(self, *args):
yaml_layer = yaml_layer[arg]
return yaml_layer
except KeyError as exc:
logger.info("{} not found in the config file.", env_keyword)
logger.info(f"{': '.join(args)} not found in the config file.")
except yaml.YAMLError as exc:
logger.warning(exc)

# If it's not available in the feathr_config.yaml file, Feathr will try to retrieve the value from key vault
if self.akv_name:
try:
return self.akv_client.get_feathr_akv_secret(env_keyword)
except ResourceNotFoundError:
# print out warning message if cannot find the env variable in all the resources
logger.warning('Environment variable {} not found in environment variable, default YAML config file, or key vault service.', env_keyword)
return None

def get_environment_variable(self, variable_key):
"""Gets the environment variable for the variable key.


Args:
variable_key: environment variable key that is used to retrieve the environment variable
Return:
A environment variable for the variable key. It will retrieve the value of the environment variables in the following order:
If the key is set in the environment variable, Feathr will use the value of that environment variable
If it's not available in the environment variable file, Feathr will try to retrieve the value from key vault
If not found, an empty string will be returned with a warning error message.
"""
env_var_value = os.environ.get(variable_key)

if env_var_value:
return env_var_value

# If it's not available in the environment variable file, Feathr will try to retrieve the value from key vault
logger.info(variable_key + ' is not set in the environment variables.')

if self.akv_name:
try:
return self.akv_client.get_feathr_akv_secret(variable_key)
except ResourceNotFoundError:
# print out warning message if cannot find the env variable in all the resources
logger.warning('Environment variable {} not found in environment variable or key vault service.', variable_key)
return None

return None
Loading