Skip to content

Commit

Permalink
Merge branch 'main' into jumin/fix_xdist
Browse files Browse the repository at this point in the history
Signed-off-by: Jun Ki Min <[email protected]>
  • Loading branch information
loomlike committed Nov 15, 2022
2 parents c049958 + 14f0f12 commit 56974db
Show file tree
Hide file tree
Showing 21 changed files with 307 additions and 210 deletions.
32 changes: 18 additions & 14 deletions .github/workflows/docker-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ on:
branches:
- 'releases/**'


jobs:
build_and_push_image_to_registry:
name: Push Docker image to Docker Hub
Expand Down Expand Up @@ -45,27 +44,32 @@ jobs:
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}

# Deploy the docker container to the three test environments for feathr
# Trigger Azure Web App webhooks to pull the latest nightly image
deploy:
runs-on: ubuntu-latest
needs: build_and_push_image_to_registry



steps:
- name: Deploy to Feathr Purview Registry Azure Web App
id: deploy-to-purview-webapp
- name: Deploy to Azure Web App feathr-registry-purview
id: deploy-to-feathr-registry-purview
uses: distributhor/[email protected]
env:
webhook_url: ${{ secrets.AZURE_WEBAPP_FEATHR_REGISTRY_PURVIEW_WEBHOOK }}

- name: Deploy to Azure Web App feathr-registry-purview-rbac
id: deploy-to-feathr-registry-purview-rbac
uses: distributhor/[email protected]
env:
webhook_url: ${{ secrets.AZURE_WEBAPP_FEATHR_PURVIEW_REGISTRY_WEBHOOK }}
webhook_url: ${{ secrets.AZURE_WEBAPP_FEATHR_REGISTRY_PURVIEW_RBAC_WEBHOOK }}

- name: Deploy to Feathr RBAC Registry Azure Web App
id: deploy-to-rbac-webapp
- name: Deploy to Azure Web App feathr-registry-sql
id: deploy-to-feathr-registry-sql
uses: distributhor/[email protected]
env:
webhook_url: ${{ secrets.AZURE_WEBAPP_FEATHR_RBAC_REGISTRY_WEBHOOK }}
- name: Deploy to Feathr SQL Registry Azure Web App
id: deploy-to-sql-webapp
webhook_url: ${{ secrets.AZURE_WEBAPP_FEATHR_REGISTRY_SQL_WEBHOOK }}

- name: Deploy to Azure Web App feathr-registry-sql-rbac
id: deploy-to-feathr-registry-sql-rbac
uses: distributhor/[email protected]
env:
webhook_url: ${{ secrets.AZURE_WEBAPP_FEATHR_SQL_REGISTRY_WEBHOOK }}
webhook_url: ${{ secrets.AZURE_WEBAPP_FEATHR_REGISTRY_SQL_RBAC_WEBHOOK }}
110 changes: 58 additions & 52 deletions docs/how-to-guides/feathr-configuration-and-env.md

Large diffs are not rendered by default.

9 changes: 8 additions & 1 deletion docs/how-to-guides/feathr-input-format.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
layout: default
title: Input File Format for Feathr
title: Input File for Feathr
parent: How-to Guides
---

Expand All @@ -18,3 +18,10 @@ Many Spark users will use delta lake format to store the results. In those cases
![Spark Output](../images/spark-output.png)

Please note that although the results are shown as "parquet", you should use the path of the parent folder and use `delta` format to read the folder.

# TimePartitionPattern for input files
When data sources are defined by 'HdfsSource', feathr supports 'time_partition_pattern' to match paths of input files. For example, given time_partition_pattern = 'yyyy/MM/dd' and a 'base_path', all available input files under paths 'base_path'/yyyy/MM/dd will be visited and used as data sources.

More reference on the APIs:

- [MaterializationSettings API doc](https://feathr.readthedocs.io/en/latest/feathr.html#feathr.MaterializationSettings)
8 changes: 4 additions & 4 deletions docs/samples/customer360/Customer360.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,8 @@
" - 'REDIS_PASSWORD'\n",
" - 'ADLS_ACCOUNT'\n",
" - 'ADLS_KEY'\n",
" - 'WASB_ACCOUNT'\n",
" - 'WASB_KEY'\n",
" - 'BLOB_ACCOUNT'\n",
" - 'BLOB_KEY'\n",
" - 'DATABRICKS_WORKSPACE_TOKEN_VALUE '\n",
" \n",
"offline_store:\n",
Expand Down Expand Up @@ -327,8 +327,8 @@
"os.environ['REDIS_PASSWORD'] = ''\n",
"os.environ['ADLS_ACCOUNT'] = ''\n",
"os.environ['ADLS_KEY'] = ''\n",
"os.environ['WASB_ACCOUNT'] = \"\"\n",
"os.environ['WASB_KEY'] = ''\n",
"os.environ['BLOB_ACCOUNT'] = \"\"\n",
"os.environ['BLOB_KEY'] = ''\n",
"os.environ['DATABRICKS_WORKSPACE_TOKEN_VALUE'] = ''"
]
},
Expand Down
7 changes: 5 additions & 2 deletions feathr_project/feathr/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from jinja2 import Template
from pyhocon import ConfigFactory
import redis
from loguru import logger

from feathr.constants import *
from feathr.definition._materialization_utils import _to_materialization_config
Expand All @@ -33,7 +34,7 @@
from feathr.utils.feature_printer import FeaturePrinter
from feathr.utils.spark_job_params import FeatureGenerationJobParams, FeatureJoinJobParams
from feathr.definition.source import InputContext

from feathr.version import get_version

class FeathrClient(object):
"""Feathr client.
Expand Down Expand Up @@ -172,6 +173,8 @@ def __init__(self, config_path:str = "./feathr_config.yaml", local_workspace_dir
# initialize registry
self.registry = default_registry_client(self.project_name, config_path=config_path, credential=self.credential)

logger.info(f"Feathr Client {get_version()} initialized successfully")

def _check_required_environment_variables_exist(self):
"""Checks if the required environment variables(form feathr_config.yaml) is set.
Expand Down Expand Up @@ -881,4 +884,4 @@ def _reshape_config_str(self, config_str:str):
if self.spark_runtime == 'local':
return "'{" + config_str + "}'"
else:
return config_str
return config_str
5 changes: 0 additions & 5 deletions feathr_project/feathr/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,6 @@
TYPEDEF_ARRAY_DERIVED_FEATURE=f"array<feathr_derived_feature_{REGISTRY_TYPEDEF_VERSION}>"
TYPEDEF_ARRAY_ANCHOR_FEATURE=f"array<feathr_anchor_feature_{REGISTRY_TYPEDEF_VERSION}>"

# Decouple Feathr MAVEN Version from Feathr Python SDK Version
import os
from feathr.version import __version__
FEATHR_MAVEN_VERSION = os.environ.get("FEATHR_MAVEN_VERSION", __version__)
FEATHR_MAVEN_ARTIFACT=f"com.linkedin.feathr:feathr_2.12:{FEATHR_MAVEN_VERSION}"

JOIN_CLASS_NAME="com.linkedin.feathr.offline.job.FeatureJoinJob"
GEN_CLASS_NAME="com.linkedin.feathr.offline.job.FeatureGenJob"
19 changes: 18 additions & 1 deletion feathr_project/feathr/definition/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,27 @@ class HdfsSource(Source):
- `epoch_millis` (milliseconds since epoch), for example `1647737517761`
- Any date formats supported by [SimpleDateFormat](https://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.html).
registry_tags: A dict of (str, str) that you can pass to feature registry for better organization. For example, you can use {"deprecated": "true"} to indicate this source is deprecated, etc.
time_partition_pattern(Optional[str]): Format of the time partitioned feature data. e.g. yyyy/MM/DD. All formats supported in dateTimeFormatter.
config:
timeSnapshotHdfsSource:
{
location:
{
path: "/data/somePath/daily"
}
timePartitionPattern: "yyyy/MM/dd"
}
Given the above HDFS path: /data/somePath/daily,
then the expectation is that the following sub directorie(s) should exist:
/data/somePath/daily/{yyyy}/{MM}/{dd}
"""

def __init__(self, name: str, path: str, preprocessing: Optional[Callable] = None, event_timestamp_column: Optional[str] = None, timestamp_format: Optional[str] = "epoch", registry_tags: Optional[Dict[str, str]] = None) -> None:
def __init__(self, name: str, path: str, preprocessing: Optional[Callable] = None, event_timestamp_column: Optional[str] = None, timestamp_format: Optional[str] = "epoch", registry_tags: Optional[Dict[str, str]] = None, time_partition_pattern: Optional[str] = None) -> None:
super().__init__(name, event_timestamp_column,
timestamp_format, registry_tags=registry_tags)
self.path = path
self.preprocessing = preprocessing
self.time_partition_pattern = time_partition_pattern
if path.startswith("http"):
logger.warning(
"Your input path {} starts with http, which is not supported. Consider using paths starting with wasb[s]/abfs[s]/s3.", path)
Expand All @@ -116,6 +130,9 @@ def to_feature_config(self) -> str:
tm = Template("""
{{source.name}}: {
location: {path: "{{source.path}}"}
{% if source.time_partition_pattern %}
timePartitionPattern: "{{source.time_partition_pattern}}"
{% endif %}
{% if source.event_timestamp_column %}
timeWindowParameters: {
timestampColumn: "{{source.event_timestamp_column}}"
Expand Down
3 changes: 1 addition & 2 deletions feathr_project/feathr/registry/_feature_registry_purview.py
Original file line number Diff line number Diff line change
Expand Up @@ -744,8 +744,7 @@ def upload_single_entity_to_purview(self,entity:Union[AtlasEntity,AtlasProcess])
"""
Try to find existing entity/process first, if found, return the existing entity's GUID
"""
id = self.get_entity_id(entity.qualifiedName)
response = self.purview_client.get_entity(id)['entities'][0]
response = self.purview_client.get_entity(qualifiedName=entity.qualifiedName)['entities'][0]
j = entity.to_json()
if j["typeName"] == response["typeName"]:
if j["typeName"] == "Process":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from requests.structures import CaseInsensitiveDict

from feathr.constants import *
from feathr.version import get_maven_artifact_fullname
from feathr.spark_provider._abc import SparkJobLauncher


Expand Down Expand Up @@ -196,8 +197,8 @@ def submit_feathr_job(

# the feathr main jar file is anyway needed regardless it's pyspark or scala spark
if not main_jar_path:
logger.info(f"Main JAR file is not set, using default package '{FEATHR_MAVEN_ARTIFACT}' from Maven")
submission_params["libraries"][0]["maven"] = {"coordinates": FEATHR_MAVEN_ARTIFACT}
logger.info(f"Main JAR file is not set, using default package '{get_maven_artifact_fullname()}' from Maven")
submission_params['libraries'][0]['maven'] = { "coordinates": get_maven_artifact_fullname() }
else:
submission_params["libraries"][0]["jar"] = self.upload_or_get_cloud_path(main_jar_path)
# see here for the submission parameter definition https://docs.microsoft.com/en-us/azure/databricks/dev-tools/api/2.0/jobs#--request-structure-6
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from loguru import logger
from pyspark import *

from feathr.constants import FEATHR_MAVEN_ARTIFACT, OUTPUT_PATH_TAG
from feathr.version import get_maven_artifact_fullname
from feathr.spark_provider._abc import SparkJobLauncher


Expand Down Expand Up @@ -81,7 +81,7 @@ def submit_feathr_job(

# Get conf and package arguments
cfg = configuration.copy() if configuration else {}
maven_dependency = f"{cfg.pop('spark.jars.packages', self.packages)},{FEATHR_MAVEN_ARTIFACT}"
maven_dependency = f"{cfg.pop('spark.jars.packages', self.packages)},{get_maven_artifact_fullname()}"
spark_args = self._init_args(job_name=job_name, confs=cfg)

if not main_jar_path:
Expand All @@ -90,7 +90,7 @@ def submit_feathr_job(
# This is a JAR job
# Azure Synapse/Livy doesn't allow JAR job starts from Maven directly, we must have a jar file uploaded.
# so we have to use a dummy jar as the main file.
logger.info(f"Main JAR file is not set, using default package '{FEATHR_MAVEN_ARTIFACT}' from Maven")
logger.info(f"Main JAR file is not set, using default package '{get_maven_artifact_fullname()}' from Maven")
# Use the no-op jar as the main file
# This is a dummy jar which contains only one `org.example.Noop` class with one empty `main` function
# which does nothing
Expand Down
13 changes: 8 additions & 5 deletions feathr_project/feathr/spark_provider/_synapse_submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from feathr.spark_provider._abc import SparkJobLauncher
from feathr.constants import *
from feathr.version import get_maven_artifact_fullname

class LivyStates(Enum):
""" Adapt LivyStates over to relax the dependency for azure-synapse-spark pacakge.
Expand Down Expand Up @@ -114,12 +115,12 @@ def submit_feathr_job(self, job_name: str, main_jar_path: str = None, main_clas
if not main_jar_path:
# We don't have the main jar, use Maven
# Add Maven dependency to the job configuration
logger.info(f"Main JAR file is not set, using default package '{FEATHR_MAVEN_ARTIFACT}' from Maven")
logger.info(f"Main JAR file is not set, using default package '{get_maven_artifact_fullname()}' from Maven")
if "spark.jars.packages" in cfg:
cfg["spark.jars.packages"] = ",".join(
[cfg["spark.jars.packages"], FEATHR_MAVEN_ARTIFACT])
[cfg["spark.jars.packages"], get_maven_artifact_fullname()])
else:
cfg["spark.jars.packages"] = FEATHR_MAVEN_ARTIFACT
cfg["spark.jars.packages"] = get_maven_artifact_fullname()

if not python_files:
# This is a JAR job
Expand Down Expand Up @@ -169,7 +170,7 @@ def submit_feathr_job(self, job_name: str, main_jar_path: str = None, main_clas
def wait_for_completion(self, timeout_seconds: Optional[float]) -> bool:
"""
Returns true if the job completed successfully
"""
"""
start_time = time.time()
while (timeout_seconds is None) or (time.time() - start_time < timeout_seconds):
status = self.get_status()
Expand All @@ -178,7 +179,9 @@ def wait_for_completion(self, timeout_seconds: Optional[float]) -> bool:
return True
elif status in {LivyStates.ERROR.value, LivyStates.DEAD.value, LivyStates.KILLED.value}:
logger.error("Feathr job has failed.")
logger.error(self._api.get_driver_log(self.current_job_info.id).decode('utf-8'))
error_msg = self._api.get_driver_log(self.current_job_info.id).decode('utf-8')
logger.error(error_msg)
logger.error("The size of the whole error log is: {}. The logs might be truncated in some cases (such as in Visual Studio Code) so only the top a few lines of the error message is displayed. If you cannot see the whole log, you may want to extend the setting for output size limit.", len(error_msg))
return False
else:
time.sleep(30)
Expand Down
11 changes: 10 additions & 1 deletion feathr_project/feathr/version.py
Original file line number Diff line number Diff line change
@@ -1 +1,10 @@
__version__ = "0.9.0-rc2"
__version__ = "0.9.0-rc2"

def get_version():
return __version__

# Decouple Feathr MAVEN Version from Feathr Python SDK Version
import os
def get_maven_artifact_fullname():
maven_artifact_version = os.environ.get("MAVEN_ARTIFACT_VERSION", __version__)
return f"com.linkedin.feathr:feathr_2.12:{maven_artifact_version}"
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ project_config:
# the environemnt variables are optional, however you will need them if you want to use some of the services:
- ADLS_ACCOUNT
- ADLS_KEY
- WASB_ACCOUNT
- WASB_KEY
- BLOB_ACCOUNT
- BLOB_KEY
- S3_ACCESS_KEY
- S3_SECRET_KEY
- JDBC_TABLE
Expand All @@ -41,7 +41,7 @@ offline_store:
adls_enabled: true

# paths starts with wasb:// or wasbs://
# WASB_ACCOUNT and WASB_KEY should be set in environment variable
# BLOB_ACCOUNT and BLOB_KEY should be set in environment variable
wasb:
wasb_enabled: true

Expand Down Expand Up @@ -118,8 +118,8 @@ feature_registry:
delimiter: "__"
# controls whether the type system will be initialized or not. Usually this is only required to be executed once.
type_system_initialization: false


secrets:
azure_key_vault:
name: feathrazuretest3-kv
Loading

0 comments on commit 56974db

Please sign in to comment.