Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support for arrays in snowflake #3769

Merged
merged 1 commit into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions sdk/python/feast/infra/offline_stores/snowflake.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import contextlib
import json
import os
import uuid
import warnings
Expand Down Expand Up @@ -51,6 +52,17 @@
)
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.saved_dataset import SavedDatasetStorage
from feast.types import (
Array,
Bool,
Bytes,
Float32,
Float64,
Int32,
Int64,
String,
UnixTimestamp,
)
from feast.usage import log_exceptions_and_usage

try:
Expand Down Expand Up @@ -320,6 +332,7 @@ def query_generator() -> Iterator[str]:
on_demand_feature_views=OnDemandFeatureView.get_requested_odfvs(
feature_refs, project, registry
),
feature_views=feature_views,
metadata=RetrievalMetadata(
features=feature_refs,
keys=list(entity_schema.keys() - {entity_df_event_timestamp_col}),
Expand Down Expand Up @@ -398,9 +411,12 @@ def __init__(
config: RepoConfig,
full_feature_names: bool,
on_demand_feature_views: Optional[List[OnDemandFeatureView]] = None,
feature_views: Optional[List[FeatureView]] = None,
metadata: Optional[RetrievalMetadata] = None,
):

if feature_views is None:
feature_views = []
if not isinstance(query, str):
self._query_generator = query
else:
Expand All @@ -416,6 +432,7 @@ def query_generator() -> Iterator[str]:
self.config = config
self._full_feature_names = full_feature_names
self._on_demand_feature_views = on_demand_feature_views or []
self._feature_views = feature_views
self._metadata = metadata
self.export_path: Optional[str]
if self.config.offline_store.blob_export_location:
Expand All @@ -436,6 +453,20 @@ def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame:
self.snowflake_conn, self.to_sql()
).fetch_pandas_all()

for feature_view in self._feature_views:
for feature in feature_view.features:
if feature.dtype in [
Array(String),
Array(Bytes),
Array(Int32),
Array(Int64),
Array(UnixTimestamp),
Array(Float64),
Array(Float32),
Array(Bool),
]:
df[feature.name] = [json.loads(x) for x in df[feature.name]]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These arrays come in as a json string so this converts them to a list.


return df

def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table:
Expand Down
6 changes: 3 additions & 3 deletions sdk/python/feast/infra/offline_stores/snowflake_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,12 @@ def get_table_column_names_and_types(
else:
row["snowflake_type"] = "NUMBERwSCALE"

elif row["type_code"] in [5, 9, 10, 12]:
elif row["type_code"] in [5, 9, 12]:
error = snowflake_unsupported_map[row["type_code"]]
raise NotImplementedError(
f"The following Snowflake Data Type is not supported: {error}"
)
elif row["type_code"] in [1, 2, 3, 4, 6, 7, 8, 11, 13]:
elif row["type_code"] in [1, 2, 3, 4, 6, 7, 8, 10, 11, 13]:
row["snowflake_type"] = snowflake_type_code_map[row["type_code"]]
else:
raise NotImplementedError(
Expand All @@ -305,14 +305,14 @@ def get_table_column_names_and_types(
6: "TIMESTAMP_LTZ",
7: "TIMESTAMP_TZ",
8: "TIMESTAMP_NTZ",
10: "ARRAY",
11: "BINARY",
13: "BOOLEAN",
}

snowflake_unsupported_map = {
5: "VARIANT -- Try converting to VARCHAR",
9: "OBJECT -- Try converting to VARCHAR",
10: "ARRAY -- Try converting to VARCHAR",
12: "TIME -- Try converting to VARCHAR",
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,62 @@ CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_varchar_to_string_pro
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_varchar_to_string_proto'
IMPORTS = ('@STAGE_HOLDER/feast.zip');

CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_array_bytes_to_list_bytes_proto(df ARRAY)
RETURNS BINARY
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('protobuf', 'pandas')
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_bytes_to_list_bytes_proto'
IMPORTS = ('@STAGE_HOLDER/feast.zip');

CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_array_varchar_to_list_string_proto(df ARRAY)
RETURNS BINARY
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('protobuf', 'pandas')
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_varchar_to_list_string_proto'
IMPORTS = ('@STAGE_HOLDER/feast.zip');

CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_array_number_to_list_int32_proto(df ARRAY)
RETURNS BINARY
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('protobuf', 'pandas')
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_number_to_list_int32_proto'
IMPORTS = ('@STAGE_HOLDER/feast.zip');

CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_array_number_to_list_int64_proto(df ARRAY)
RETURNS BINARY
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('protobuf', 'pandas')
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_number_to_list_int64_proto'
IMPORTS = ('@STAGE_HOLDER/feast.zip');

CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_array_float_to_list_double_proto(df ARRAY)
RETURNS BINARY
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('protobuf', 'pandas')
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_float_to_list_double_proto'
IMPORTS = ('@STAGE_HOLDER/feast.zip');

CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_array_boolean_to_list_bool_proto(df ARRAY)
RETURNS BINARY
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('protobuf', 'pandas')
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_boolean_to_list_bool_proto'
IMPORTS = ('@STAGE_HOLDER/feast.zip');

CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_array_timestamp_to_list_unix_timestamp_proto(df ARRAY)
RETURNS BINARY
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('protobuf', 'pandas')
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_timestamp_to_list_unix_timestamp_proto'
IMPORTS = ('@STAGE_HOLDER/feast.zip');

CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_number_to_int32_proto(df NUMBER)
RETURNS BINARY
LANGUAGE PYTHON
Expand Down
175 changes: 175 additions & 0 deletions sdk/python/feast/infra/utils/snowflake/snowpark/snowflake_udfs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import sys
from binascii import unhexlify

import numpy as np
import pandas
from _snowflake import vectorized

Expand Down Expand Up @@ -59,6 +60,180 @@ def feast_snowflake_varchar_to_string_proto(df):
return df


"""
CREATE OR REPLACE FUNCTION feast_snowflake_array_bytes_to_list_bytes_proto(df ARRAY)
RETURNS BINARY
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('protobuf', 'pandas')
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_bytes_to_list_bytes_proto'
IMPORTS = ('@feast_stage/feast.zip');
"""
# ValueType.STRING_LIST = 12
@vectorized(input=pandas.DataFrame)
def feast_snowflake_array_bytes_to_list_bytes_proto(df):
sys._xoptions["snowflake_partner_attribution"].append("feast")

# Sometimes bytes come in as strings so we need to convert back to float
numpy_arrays = np.asarray(df[0].to_list()).astype(bytes)

df = list(
map(
ValueProto.SerializeToString,
python_values_to_proto_values(numpy_arrays, ValueType.BYTES_LIST),
)
)
return df


"""
CREATE OR REPLACE FUNCTION feast_snowflake_array_varchar_to_list_string_proto(df ARRAY)
RETURNS BINARY
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('protobuf', 'pandas')
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_varchar_to_list_string_proto'
IMPORTS = ('@feast_stage/feast.zip');
"""


@vectorized(input=pandas.DataFrame)
def feast_snowflake_array_varchar_to_list_string_proto(df):
sys._xoptions["snowflake_partner_attribution"].append("feast")

df = list(
map(
ValueProto.SerializeToString,
python_values_to_proto_values(df[0].to_numpy(), ValueType.STRING_LIST),
)
)
return df


"""
CREATE OR REPLACE FUNCTION feast_snowflake_array_number_to_list_int32_proto(df ARRAY)
RETURNS BINARY
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('protobuf', 'pandas')
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_number_to_list_int32_proto'
IMPORTS = ('@feast_stage/feast.zip');
"""


@vectorized(input=pandas.DataFrame)
def feast_snowflake_array_number_to_list_int32_proto(df):
sys._xoptions["snowflake_partner_attribution"].append("feast")

df = list(
map(
ValueProto.SerializeToString,
python_values_to_proto_values(df[0].to_numpy(), ValueType.INT32_LIST),
)
)
return df


"""
CREATE OR REPLACE FUNCTION feast_snowflake_array_number_to_list_int64_proto(df ARRAY)
RETURNS BINARY
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('protobuf', 'pandas')
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_number_to_list_int64_proto'
IMPORTS = ('@feast_stage/feast.zip');
"""


@vectorized(input=pandas.DataFrame)
def feast_snowflake_array_number_to_list_int64_proto(df):
sys._xoptions["snowflake_partner_attribution"].append("feast")

df = list(
map(
ValueProto.SerializeToString,
python_values_to_proto_values(df[0].to_numpy(), ValueType.INT64_LIST),
)
)
return df


"""
CREATE OR REPLACE FUNCTION feast_snowflake_array_float_to_list_double_proto(df ARRAY)
RETURNS BINARY
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('protobuf', 'pandas')
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_float_to_list_double_proto'
IMPORTS = ('@feast_stage/feast.zip');
"""


@vectorized(input=pandas.DataFrame)
def feast_snowflake_array_float_to_list_double_proto(df):
sys._xoptions["snowflake_partner_attribution"].append("feast")

numpy_arrays = np.asarray(df[0].to_list()).astype(float)

df = list(
map(
ValueProto.SerializeToString,
python_values_to_proto_values(numpy_arrays, ValueType.DOUBLE_LIST),
)
)
return df


"""
CREATE OR REPLACE FUNCTION feast_snowflake_array_boolean_to_list_bool_proto(df ARRAY)
RETURNS BINARY
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('protobuf', 'pandas')
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_boolean_to_list_bool_proto'
IMPORTS = ('@feast_stage/feast.zip');
"""


@vectorized(input=pandas.DataFrame)
def feast_snowflake_array_boolean_to_list_bool_proto(df):
sys._xoptions["snowflake_partner_attribution"].append("feast")

df = list(
map(
ValueProto.SerializeToString,
python_values_to_proto_values(df[0].to_numpy(), ValueType.BOOL_LIST),
)
)
return df


"""
CREATE OR REPLACE FUNCTION feast_snowflake_array_timestamp_to_list_unix_timestamp_proto(df ARRAY)
RETURNS BINARY
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('protobuf', 'pandas')
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_timestamp_to_list_unix_timestamp_proto'
IMPORTS = ('@feast_stage/feast.zip');
"""


@vectorized(input=pandas.DataFrame)
def feast_snowflake_array_timestamp_to_list_unix_timestamp_proto(df):
sys._xoptions["snowflake_partner_attribution"].append("feast")

numpy_arrays = np.asarray(df[0].to_list()).astype(np.datetime64)

df = list(
map(
ValueProto.SerializeToString,
python_values_to_proto_values(numpy_arrays, ValueType.UNIX_TIMESTAMP_LIST),
)
)
return df


"""
CREATE OR REPLACE FUNCTION feast_snowflake_number_to_int32_proto(df NUMBER)
RETURNS BINARY
Expand Down
8 changes: 8 additions & 0 deletions sdk/python/feast/type_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,14 @@ def _convert_value_name_to_snowflake_udf(value_name: str, project_name: str) ->
"FLOAT": f"feast_{project_name}_snowflake_float_to_double_proto",
"BOOL": f"feast_{project_name}_snowflake_boolean_to_bool_proto",
"UNIX_TIMESTAMP": f"feast_{project_name}_snowflake_timestamp_to_unix_timestamp_proto",
"BYTES_LIST": f"feast_{project_name}_snowflake_array_bytes_to_list_bytes_proto",
"STRING_LIST": f"feast_{project_name}_snowflake_array_varchar_to_list_string_proto",
"INT32_LIST": f"feast_{project_name}_snowflake_array_number_to_list_int32_proto",
"INT64_LIST": f"feast_{project_name}_snowflake_array_number_to_list_int64_proto",
"DOUBLE_LIST": f"feast_{project_name}_snowflake_array_float_to_list_double_proto",
"FLOAT_LIST": f"feast_{project_name}_snowflake_array_float_to_list_double_proto",
"BOOL_LIST": f"feast_{project_name}_snowflake_array_boolean_to_list_bool_proto",
"UNIX_TIMESTAMP_LIST": f"feast_{project_name}_snowflake_array_timestamp_to_list_unix_timestamp_proto",
}
return name_map[value_name].upper()

Expand Down
1 change: 1 addition & 0 deletions sdk/python/tests/data/data_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def get_feature_values_for_dtype(
"int64": [1, 2, 3, 4, 5],
"float": [1.0, None, 3.0, 4.0, 5.0],
"string": ["1", None, "3", "4", "5"],
"bytes": [b"1", None, b"3", b"4", b"5"],
"bool": [True, None, False, True, False],
"datetime": [
datetime(1980, 1, 1),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@
"password": os.getenv("SNOWFLAKE_CI_PASSWORD", ""),
"role": os.getenv("SNOWFLAKE_CI_ROLE", ""),
"warehouse": os.getenv("SNOWFLAKE_CI_WAREHOUSE", ""),
"database": "FEAST",
"schema": "ONLINE",
"database": os.getenv("SNOWFLAKE_CI_DATABASE", "FEAST"),
"schema": os.getenv("SNOWFLAKE_CI_SCHEMA_ONLINE", "ONLINE"),
Comment on lines +86 to +87
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My company locked down creating a database so it's easier for me to use my provided database/schema as a sandbox. This keeps the default the same but lets me choose the name during testing.

}

BIGTABLE_CONFIG = {
Expand Down
Loading
Loading