Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add use_env_var flag to client #923

Merged
merged 11 commits into from
Dec 22, 2022
11 changes: 6 additions & 5 deletions docs/samples/feature_embedding.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
"outputs": [],
"source": [
"import json\n",
"import os\n",
"\n",
"import pandas as pd\n",
"from pyspark.sql import DataFrame\n",
Expand Down Expand Up @@ -102,7 +103,7 @@
},
"outputs": [],
"source": [
"RESOURCE_PREFIX = None # TODO fill the value\n",
"RESOURCE_PREFIX = \"\" # TODO fill the value\n",
"PROJECT_NAME = \"hotel_reviews_embedding\"\n",
"\n",
"REGISTRY_ENDPOINT = f\"https://{RESOURCE_PREFIX}webapp.azurewebsites.net/api/v1\"\n",
Expand All @@ -114,8 +115,8 @@
" SPARK_CONFIG__DATABRICKS__WORKSPACE_INSTANCE_URL = f\"https://{ctx.tags().get('browserHostName').get()}\"\n",
"else:\n",
" # TODO fill the values.\n",
" DATABRICKS_WORKSPACE_TOKEN_VALUE = None\n",
" SPARK_CONFIG__DATABRICKS__WORKSPACE_INSTANCE_URL = None\n",
" DATABRICKS_WORKSPACE_TOKEN_VALUE = os.environ.get(\"DATABRICKS_WORKSPACE_TOKEN_VALUE\")\n",
" SPARK_CONFIG__DATABRICKS__WORKSPACE_INSTANCE_URL = os.environ.get(\"SPARK_CONFIG__DATABRICKS__WORKSPACE_INSTANCE_URL\")\n",
"\n",
"# We'll need an authentication credential to access Azure resources and register features \n",
"USE_CLI_AUTH = False # Set True to use interactive authentication\n",
Expand Down Expand Up @@ -146,7 +147,6 @@
" credential = AzureCliCredential(additionally_allowed_tenants=['*'],)\n",
"elif AZURE_TENANT_ID and AZURE_CLIENT_ID and AZURE_CLIENT_SECRET:\n",
" # Use Environment variable secret\n",
" import os\n",
" from azure.identity import EnvironmentCredential\n",
" os.environ[\"AZURE_TENANT_ID\"] = AZURE_TENANT_ID\n",
" os.environ[\"AZURE_CLIENT_ID\"] = AZURE_CLIENT_ID\n",
Expand Down Expand Up @@ -315,6 +315,7 @@
"client = FeathrClient(\n",
" config_path=config_path,\n",
" credential=credential,\n",
" use_env_vars=False,\n",
")"
]
},
Expand Down Expand Up @@ -791,7 +792,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.4"
"version": "3.10.8 (main, Nov 24 2022, 14:13:03) [GCC 11.2.0]"
},
"vscode": {
"interpreter": {
Expand Down
210 changes: 109 additions & 101 deletions feathr_project/feathr/client.py

Large diffs are not rendered by default.

4 changes: 1 addition & 3 deletions feathr_project/feathr/registry/feature_registry.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
from abc import ABC, abstractmethod
from pathlib import Path

from typing import Any, Dict, List, Optional, Tuple
from typing import List, Tuple
from feathr.definition.feature_derivations import DerivedFeature
from feathr.definition.anchor import FeatureAnchor
from feathr.utils._envvariableutil import _EnvVaraibleUtil

class FeathrRegistry(ABC):
"""This is the abstract class for all the feature registries. All the feature registries should implement those interfaces.
Expand Down
120 changes: 120 additions & 0 deletions feathr_project/feathr/utils/_env_config_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import os
from pathlib import Path
import yaml

from loguru import logger

from azure.core.exceptions import ResourceNotFoundError
from feathr.secrets.akv_client import AzureKeyVaultClient

class EnvConfigReader(object):
"""A utility class to read Feathr environment variables either from os environment variables,
the config yaml file or Azure Key Vault.
If a key is set in the environment variable, ConfigReader will return the value of that environment variable
unless use_env_vars set to False.
"""
akv_name: str = None # Azure Key Vault name to use for retrieving config values.
yaml_config: dict = None # YAML config file content.

def __init__(self, config_path: str, use_env_vars: bool = True):
"""Initialize the utility class.

Args:
config_path: Config file path.
use_env_vars (optional): Whether to use os environment variables instead of config file. Defaults to True.
"""
if config_path is not None:
config_path = Path(config_path)
if config_path.is_file():
try:
self.yaml_config = yaml.safe_load(config_path.read_text())
except yaml.YAMLError as e:
logger.warning(e)

self.use_env_vars = use_env_vars

self.akv_name = self.get("secrets__azure_key_vault__name")
self.akv_client = AzureKeyVaultClient(self.akv_name) if self.akv_name else None

def get(self, key: str, default: str = None) -> str:
"""Gets the Feathr config variable for the given key.
It will retrieve the value in the following order:
- From the environment variable if `use_env_vars == True` and the key is set in the os environment variables.
- From the config yaml file if the key exists.
- From the Azure Key Vault.
If the key is not found in any of the above, it will return `default`.

Args:
key: Config variable name. For example, `SPARK_CONFIG__DATABRICKS__WORKSPACE_INSTANCE_URL`
default (optional): Default value to return if the key is not found. Defaults to None.

Returns:
Feathr client's config value.
"""
conf_var = (
(self._get_variable_from_env(key) if self.use_env_vars else None) or
(self._get_variable_from_file(key) if self.yaml_config else None) or
(self._get_variable_from_akv(key) if self.akv_name else None) or
default
)

return conf_var

def get_from_env_or_akv(self, key: str) -> str:
"""Gets the Feathr config variable for the given key. This function ignores `use_env_vars` attribute and force to
look up environment variables or Azure Key Vault.
It will retrieve the value in the following order:
- From the environment variable if the key is set in the os environment variables.
- From the Azure Key Vault.
If the key is not found in any of the above, it will return None.

Args:
key: Config variable name. For example, `ADLS_ACCOUNT`

Returns:
Feathr client's config value.
"""
conf_var = (
self._get_variable_from_env(key) or
(self._get_variable_from_akv(key) if self.akv_name else None)
)

return conf_var

def _get_variable_from_env(self, key: str) -> str:
# make it work for lower case and upper case.
conf_var = os.environ.get(key.lower(), os.environ.get(key.upper()))

if conf_var is None:
logger.info(f"Config {key} is not set in the environment variables.")

return conf_var

def _get_variable_from_akv(self, key: str) -> str:
try:
# Azure Key Vault object name is case in-sensitive.
# https://learn.microsoft.com/en-us/azure/key-vault/general/about-keys-secrets-certificates#vault-name-and-object-name
return self.akv_client.get_feathr_akv_secret(key)
except ResourceNotFoundError:
logger.warning(f"Resource {self.akv_name} not found")

return None

def _get_variable_from_file(self, key: str) -> str:
args = key.split("__")
try:
conf_var = self.yaml_config
for arg in args:
if conf_var is None:
break
# make it work for lower case and upper case.
conf_var = conf_var.get(arg.lower(), conf_var.get(arg.upper()))

if conf_var is None:
logger.info(f"Config {key} is not found in the config file.")

return conf_var
except Exception as e:
logger.warning(e)

return None
91 changes: 0 additions & 91 deletions feathr_project/feathr/utils/_envvariableutil.py

This file was deleted.

41 changes: 17 additions & 24 deletions feathr_project/feathr/utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,22 @@
}
}


# New databricks job cluster config
DEFAULT_DATABRICKS_CLUSTER_CONFIG = {
"spark_version": "11.2.x-scala2.12",
"node_type_id": "Standard_D3_v2",
"num_workers": 2,
"node_type_id": "Standard_D3_v2", # Change this if necessary
"num_workers": 1,
"spark_conf": {
"FEATHR_FILL_IN": "FEATHR_FILL_IN",
# Exclude conflicting packages if use feathr <= v0.8.0:
"spark.jars.excludes": "commons-logging:commons-logging,org.slf4j:slf4j-api,com.google.protobuf:protobuf-java,javax.xml.bind:jaxb-api",
},
}


# New Azure Synapse spark pool config
DEFAULT_AZURE_SYNAPSE_SPARK_POOL_CONFIG = {
"executor_size": "Small",
"executor_num": 2,
"executor_num": 1,
}


Expand All @@ -59,16 +57,9 @@ def generate_config(
databricks_cluster_id: str = None,
redis_password: str = None,
adls_key: str = None,
use_env_vars: bool = True,
**kwargs,
) -> str:
"""Generate a feathr config yaml file.
Note, `use_env_vars` argument gives an option to either use environment variables for generating the config file
or not. Feathr client will use environment variables anyway if they are set.

Keyword arguments follow the same naming convention as the feathr config. E.g. to set Databricks as the target
cluster, use `spark_config__spark_cluster="databricks"`.
See https://feathr-ai.github.io/feathr/quickstart_synapse.html#step-4-update-feathr-config for more details.

Note:
This utility function assumes Azure resources are deployed using the Azure Resource Manager (ARM) template,
Expand All @@ -78,14 +69,16 @@ def generate_config(
Args:
resource_prefix: Resource name prefix used when deploying Feathr resources by using ARM template.
project_name: Feathr project name.
cluster_name (optional): Databricks cluster or Azure Synapse spark pool name to use an existing one.
output_filepath (optional): Output filepath.
use_env_vars (optional): Whether to use environment variables if they are set.
databricks_workspace_token_value (optional): Databricks workspace token. If provided, the value will be stored
as the environment variable.
databricks_cluster_id (optional): Databricks cluster id to use an existing cluster.
redis_password (optional): Redis password. If provided, the value will be stored as the environment variable.
adls_key (optional): ADLS key. If provided, the value will be stored as the environment variable.
**kwargs: Keyword arguments to update the config. Keyword arguments follow the same naming convention as
the feathr config. E.g. to set Databricks as the target cluster,
use `spark_config__spark_cluster="databricks"`.
See https://feathr-ai.github.io/feathr/quickstart_synapse.html#step-4-update-feathr-config for more details.

Returns:
str: Generated config file path. This will be identical to `output_filepath` if provided.
Expand All @@ -100,6 +93,16 @@ def generate_config(

# Set configs
config = deepcopy(DEFAULT_FEATHR_CONFIG)

# Maybe update configs with environment variables
_maybe_update_config_with_env_var(config, "SPARK_CONFIG__SPARK_CLUSTER")
_maybe_update_config_with_env_var(config, "SPARK_CONFIG__AZURE_SYNAPSE__DEV_URL")
_maybe_update_config_with_env_var(config, "SPARK_CONFIG__AZURE_SYNAPSE__POOL_NAME")
_maybe_update_config_with_env_var(config, "SPARK_CONFIG__AZURE_SYNAPSE__WORKSPACE_DIR")
_maybe_update_config_with_env_var(config, "SPARK_CONFIG__DATABRICKS__WORK_DIR")
_maybe_update_config_with_env_var(config, "SPARK_CONFIG__DATABRICKS__WORKSPACE_INSTANCE_URL")
_maybe_update_config_with_env_var(config, "SPARK_CONFIG__DATABRICKS__CONFIG_TEMPLATE")

config["project_config"]["project_name"] = project_name
config["feature_registry"]["api_endpoint"] = f"https://{resource_prefix}webapp.azurewebsites.net/api/v1"
config["online_store"]["redis"]["host"] = f"{resource_prefix}redis.redis.cache.windows.net"
Expand All @@ -124,16 +127,6 @@ def generate_config(
cluster_id=databricks_cluster_id,
)

# Maybe update configs with environment variables
if use_env_vars:
_maybe_update_config_with_env_var(config, "SPARK_CONFIG__SPARK_CLUSTER")
_maybe_update_config_with_env_var(config, "SPARK_CONFIG__AZURE_SYNAPSE__DEV_URL")
_maybe_update_config_with_env_var(config, "SPARK_CONFIG__AZURE_SYNAPSE__POOL_NAME")
_maybe_update_config_with_env_var(config, "SPARK_CONFIG__AZURE_SYNAPSE__WORKSPACE_DIR")
_maybe_update_config_with_env_var(config, "SPARK_CONFIG__DATABRICKS__WORK_DIR")
_maybe_update_config_with_env_var(config, "SPARK_CONFIG__DATABRICKS__WORKSPACE_INSTANCE_URL")
_maybe_update_config_with_env_var(config, "SPARK_CONFIG__DATABRICKS__CONFIG_TEMPLATE")

# Verify config
_verify_config(config)

Expand Down
Loading