diff --git a/sdk/python/feast/embedded_go/online_features_service.py b/sdk/python/feast/embedded_go/online_features_service.py index bf82fab6a3..c6430b5f6d 100644 --- a/sdk/python/feast/embedded_go/online_features_service.py +++ b/sdk/python/feast/embedded_go/online_features_service.py @@ -252,6 +252,11 @@ def transformation_callback( # the typeguard requirement. full_feature_names = bool(full_feature_names) + if odfv.mode != "pandas": + raise Exception( + f"OnDemandFeatureView mode '{odfv.mode} not supported by EmbeddedOnlineFeatureServer." + ) + output = odfv.get_transformed_features_df( input_record.to_pandas(), full_feature_names=full_feature_names ) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 9ac2c14527..bfb8a59b2b 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1995,26 +1995,53 @@ def _augment_response_with_on_demand_transforms( ) initial_response = OnlineResponse(online_features_response) - initial_response_df = initial_response.to_df() + initial_response_df: Optional[pd.DataFrame] = None + initial_response_dict: Optional[Dict[str, List[Any]]] = None # Apply on demand transformations and augment the result rows odfv_result_names = set() for odfv_name, _feature_refs in odfv_feature_refs.items(): odfv = requested_odfv_map[odfv_name] - transformed_features_df = odfv.get_transformed_features_df( - initial_response_df, - full_feature_names, - ) - selected_subset = [ - f for f in transformed_features_df.columns if f in _feature_refs - ] - - proto_values = [ - python_values_to_proto_values( - transformed_features_df[feature].values, ValueType.UNKNOWN + if odfv.mode == "python": + if initial_response_dict is None: + initial_response_dict = initial_response.to_dict() + transformed_features_dict: Dict[ + str, List[Any] + ] = odfv.get_transformed_features( + initial_response_dict, + full_feature_names, ) - for feature in selected_subset - ] + elif odfv.mode in {"pandas", "substrait"}: + if initial_response_df is None: + initial_response_df = initial_response.to_df() + transformed_features_df: pd.DataFrame = odfv.get_transformed_features( + initial_response_df, + full_feature_names, + ) + else: + raise Exception( + f"Invalid OnDemandFeatureMode: {odfv.mode}. Expected one of 'pandas', 'python', or 'substrait'." + ) + + transformed_features = ( + transformed_features_dict + if odfv.mode == "python" + else transformed_features_df + ) + transformed_columns = ( + transformed_features.columns + if isinstance(transformed_features, pd.DataFrame) + else transformed_features + ) + selected_subset = [f for f in transformed_columns if f in _feature_refs] + + proto_values = [] + for selected_feature in selected_subset: + if odfv.mode in ["python", "pandas"]: + feature_vector = transformed_features[selected_feature] + proto_values.append( + python_values_to_proto_values(feature_vector, ValueType.UNKNOWN) + ) odfv_result_names |= set(selected_subset) diff --git a/sdk/python/feast/infra/offline_stores/offline_store.py b/sdk/python/feast/infra/offline_stores/offline_store.py index 30135feccb..aaed78dd45 100644 --- a/sdk/python/feast/infra/offline_stores/offline_store.py +++ b/sdk/python/feast/infra/offline_stores/offline_store.py @@ -81,6 +81,10 @@ def to_df( if self.on_demand_feature_views: # TODO(adchia): Fix requirement to specify dependent feature views in feature_refs for odfv in self.on_demand_feature_views: + if odfv.mode not in {"pandas", "substrait"}: + raise Exception( + f'OnDemandFeatureView mode "{odfv.mode}" not supported for offline processing.' + ) features_df = features_df.join( odfv.get_transformed_features_df( features_df, @@ -124,6 +128,10 @@ def to_arrow( features_df = self._to_df_internal(timeout=timeout) if self.on_demand_feature_views: for odfv in self.on_demand_feature_views: + if odfv.mode != "pandas": + raise Exception( + f'OnDemandFeatureView mode "{odfv.mode}" not supported for offline processing.' + ) features_df = features_df.join( odfv.get_transformed_features_df( features_df, diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index ce416fff2a..8d51edbe58 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -32,6 +32,7 @@ UserDefinedFunctionV2 as UserDefinedFunctionProto, ) from feast.transformation.pandas_transformation import PandasTransformation +from feast.transformation.python_transformation import PythonTransformation from feast.transformation.substrait_transformation import SubstraitTransformation from feast.type_map import ( feast_value_type_to_pandas_type, @@ -68,7 +69,10 @@ class OnDemandFeatureView(BaseFeatureView): features: List[Field] source_feature_view_projections: Dict[str, FeatureViewProjection] source_request_sources: Dict[str, RequestSource] - feature_transformation: Union[PandasTransformation, SubstraitTransformation] + feature_transformation: Union[ + PandasTransformation, PythonTransformation, SubstraitTransformation + ] + mode: str description: str tags: Dict[str, str] owner: str @@ -88,9 +92,10 @@ def __init__( # noqa: C901 ], udf: Optional[FunctionType] = None, udf_string: str = "", - feature_transformation: Optional[ - Union[PandasTransformation, SubstraitTransformation] - ] = None, + feature_transformation: Union[ + PandasTransformation, PythonTransformation, SubstraitTransformation + ], + mode: str = "pandas", description: str = "", tags: Optional[Dict[str, str]] = None, owner: str = "", @@ -109,6 +114,7 @@ def __init__( # noqa: C901 dataframes as inputs. udf_string (deprecated): The source code version of the udf (for diffing and displaying in Web UI) feature_transformation: The user defined transformation. + mode: Mode of execution (e.g., Pandas or Python native) description (optional): A human-readable description. tags (optional): A dictionary of key-value pairs to store arbitrary metadata. owner (optional): The owner of the on demand feature view, typically the email @@ -122,16 +128,28 @@ def __init__( # noqa: C901 owner=owner, ) + if mode not in {"python", "pandas", "substrait"}: + raise Exception( + f"Unknown mode {mode}. OnDemandFeatureView only supports python or pandas UDFs and substrait." + ) + else: + self.mode = mode if not feature_transformation: if udf: warnings.warn( - "udf and udf_string parameters are deprecated. Please use transformation=OnDemandPandasTransformation(udf, udf_string) instead.", + "udf and udf_string parameters are deprecated. Please use transformation=PandasTransformation(udf, udf_string) instead.", DeprecationWarning, ) - feature_transformation = PandasTransformation(udf, udf_string) + # Note inspecting the return signature won't work with isinstance so this is the best alternative + if mode == "pandas": + feature_transformation = PandasTransformation(udf, udf_string) + elif mode == "python": + feature_transformation = PythonTransformation(udf, udf_string) + else: + pass else: raise Exception( - "OnDemandFeatureView needs to be initialized with either transformation or udf arguments" + "OnDemandFeatureView needs to be initialized with either feature_transformation or udf arguments" ) self.source_feature_view_projections: Dict[str, FeatureViewProjection] = {} @@ -159,6 +177,7 @@ def __copy__(self): sources=list(self.source_feature_view_projections.values()) + list(self.source_request_sources.values()), feature_transformation=self.feature_transformation, + mode=self.mode, description=self.description, tags=self.tags, owner=self.owner, @@ -179,6 +198,7 @@ def __eq__(self, other): self.source_feature_view_projections != other.source_feature_view_projections or self.source_request_sources != other.source_request_sources + or self.mode != other.mode or self.feature_transformation != other.feature_transformation ): return False @@ -215,7 +235,10 @@ def to_proto(self) -> OnDemandFeatureViewProto: feature_transformation = FeatureTransformationProto( user_defined_function=self.feature_transformation.to_proto() - if isinstance(self.feature_transformation, PandasTransformation) + if isinstance( + self.feature_transformation, + (PandasTransformation, PythonTransformation), + ) else None, substrait_transformation=self.feature_transformation.to_proto() if isinstance(self.feature_transformation, SubstraitTransformation) @@ -226,6 +249,7 @@ def to_proto(self) -> OnDemandFeatureViewProto: features=[feature.to_proto() for feature in self.features], sources=sources, feature_transformation=feature_transformation, + mode=self.mode, description=self.description, tags=self.tags, owner=self.owner, @@ -234,12 +258,17 @@ def to_proto(self) -> OnDemandFeatureViewProto: return OnDemandFeatureViewProto(spec=spec, meta=meta) @classmethod - def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto): + def from_proto( + cls, + on_demand_feature_view_proto: OnDemandFeatureViewProto, + skip_udf: bool = False, + ): """ Creates an on demand feature view from a protobuf representation. Args: on_demand_feature_view_proto: A protobuf representation of an on-demand feature view. + skip_udf: A boolean indicating whether to skip loading the udf Returns: A OnDemandFeatureView object based on the on-demand feature view protobuf. @@ -311,6 +340,7 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto): ], sources=sources, feature_transformation=transformation, + mode=on_demand_feature_view_proto.spec.mode, description=on_demand_feature_view_proto.spec.description, tags=dict(on_demand_feature_view_proto.spec.tags), owner=on_demand_feature_view_proto.spec.owner, @@ -349,12 +379,17 @@ def get_request_data_schema(self) -> Dict[str, ValueType]: ) return schema + def _get_projected_feature_name(self, feature: str) -> str: + return f"{self.projection.name_to_use()}__{feature}" + def get_transformed_features_df( self, df_with_features: pd.DataFrame, full_feature_names: bool = False, ) -> pd.DataFrame: # Apply on demand transformations + if not isinstance(df_with_features, pd.DataFrame): + raise TypeError("get_transformed_features_df only accepts pd.DataFrame") columns_to_cleanup = [] for source_fv_projection in self.source_feature_view_projections.values(): for feature in source_fv_projection.features: @@ -369,16 +404,15 @@ def get_transformed_features_df( columns_to_cleanup.append(full_feature_ref) # Compute transformed values and apply to each result row - - df_with_transformed_features = self.feature_transformation.transform( - df_with_features + df_with_transformed_features: pd.DataFrame = ( + self.feature_transformation.transform(df_with_features) ) # Work out whether the correct columns names are used. rename_columns: Dict[str, str] = {} for feature in self.features: short_name = feature.name - long_name = f"{self.projection.name_to_use()}__{feature.name}" + long_name = self._get_projected_feature_name(feature.name) if ( short_name in df_with_transformed_features.columns and full_feature_names @@ -392,7 +426,133 @@ def get_transformed_features_df( df_with_features.drop(columns=columns_to_cleanup, inplace=True) return df_with_transformed_features.rename(columns=rename_columns) + def get_transformed_features_dict( + self, + feature_dict: Dict[str, Any], # type: ignore + ) -> Dict[str, Any]: + + # we need a mapping from full feature name to short and back to do a renaming + # The simplest thing to do is to make the full reference, copy the columns with the short reference + # and rerun + columns_to_cleanup: List[str] = [] + for source_fv_projection in self.source_feature_view_projections.values(): + for feature in source_fv_projection.features: + full_feature_ref = f"{source_fv_projection.name}__{feature.name}" + if full_feature_ref in feature_dict.keys(): + # Make sure the partial feature name is always present + feature_dict[feature.name] = feature_dict[full_feature_ref] + columns_to_cleanup.append(str(feature.name)) + elif feature.name in feature_dict.keys(): + # Make sure the full feature name is always present + feature_dict[full_feature_ref] = feature_dict[feature.name] + columns_to_cleanup.append(str(full_feature_ref)) + + output_dict: Dict[str, Any] = self.feature_transformation.transform( + feature_dict + ) + for feature_name in columns_to_cleanup: + del output_dict[feature_name] + return output_dict + + def get_transformed_features( + self, + features: Union[Dict[str, Any], pd.DataFrame], + full_feature_names: bool = False, + ) -> Union[Dict[str, Any], pd.DataFrame]: + # TODO: classic inheritance pattern....maybe fix this + if self.mode == "python" and isinstance(features, Dict): + # note full_feature_names is not needed for the dictionary + return self.get_transformed_features_dict( + feature_dict=features, + ) + elif self.mode == "pandas" and isinstance(features, pd.DataFrame): + return self.get_transformed_features_df( + df_with_features=features, + full_feature_names=full_feature_names, + ) + else: + raise Exception( + f'Invalid OnDemandFeatureMode: {self.mode}. Expected one of "pandas" or "python".' + ) + def infer_features(self) -> None: + if self.mode in {"pandas", "substrait"}: + self._infer_features_df() + elif self.mode == "python": + self._infer_features_dict() + else: + raise Exception( + f'Invalid OnDemandFeatureMode: {self.mode}. Expected one of "pandas" or "python".' + ) + + def _infer_features_dict(self): + """ + Infers the set of features associated to this feature view from the input source. + + Raises: + RegistryInferenceFailure: The set of features could not be inferred. + """ + rand_dict_value: Dict[str, Any] = { + "float": [1.0], + "int": [1], + "str": ["hello world"], + "bytes": [str.encode("hello world")], + "bool": [True], + "datetime64[ns]": [datetime.utcnow()], + } + + feature_dict = {} + for feature_view_projection in self.source_feature_view_projections.values(): + for feature in feature_view_projection.features: + dtype = feast_value_type_to_pandas_type(feature.dtype.to_value_type()) + feature_dict[f"{feature_view_projection.name}__{feature.name}"] = ( + rand_dict_value[dtype] if dtype in rand_dict_value else [None] + ) + feature_dict[f"{feature.name}"] = ( + rand_dict_value[dtype] if dtype in rand_dict_value else [None] + ) + for request_data in self.source_request_sources.values(): + for field in request_data.schema: + dtype = feast_value_type_to_pandas_type(field.dtype.to_value_type()) + feature_dict[f"{field.name}"] = ( + rand_dict_value[dtype] if dtype in rand_dict_value else [None] + ) + + output_dict: Dict[str, List[Any]] = self.feature_transformation.transform( + feature_dict + ) + inferred_features = [] + for f, dt in output_dict.items(): + inferred_features.append( + Field( + name=f, + dtype=from_value_type( + python_type_to_feast_value_type( + f, type_name=type(dt[0]).__name__ + ) + ), + ) + ) + + if self.features: + missing_features = [] + for specified_features in self.features: + if specified_features not in inferred_features: + missing_features.append(specified_features) + if missing_features: + raise SpecifiedFeaturesNotPresentError( + missing_features, inferred_features, self.name + ) + else: + self.features = inferred_features + + if not self.features: + raise RegistryInferenceFailure( + "OnDemandFeatureView", + f"Could not infer Features for the feature view '{self.name}'.", + ) + + def _infer_features_df(self) -> None: """ Infers the set of features associated to this feature view from the input source. @@ -422,6 +582,7 @@ def infer_features(self) -> None: dtype = feast_value_type_to_pandas_type(field.dtype.to_value_type()) sample_val = rand_df_value[dtype] if dtype in rand_df_value else None df[f"{field.name}"] = pd.Series(sample_val, dtype=dtype) + output_df: pd.DataFrame = self.feature_transformation.transform(df) inferred_features = [] for f, dt in zip(output_df.columns, output_df.dtypes): @@ -478,6 +639,7 @@ def on_demand_feature_view( FeatureViewProjection, ] ], + mode: str = "pandas", description: str = "", tags: Optional[Dict[str, str]] = None, owner: str = "", @@ -491,6 +653,7 @@ def on_demand_feature_view( sources: A map from input source names to the actual input sources, which may be feature views, or request data sources. These sources serve as inputs to the udf, which will refer to them by name. + mode: The mode of execution (e.g,. Pandas or Python Native) description (optional): A human-readable description. tags (optional): A dictionary of key-value pairs to store arbitrary metadata. owner (optional): The owner of the on demand feature view, typically the email @@ -504,6 +667,7 @@ def mainify(obj) -> None: obj.__module__ = "__main__" def decorator(user_function): + return_annotation = inspect.signature(user_function).return_annotation if ( return_annotation @@ -544,13 +708,27 @@ def decorator(user_function): else: udf_string = dill.source.getsource(user_function) mainify(user_function) - transformation = PandasTransformation(user_function, udf_string) + if mode == "pandas": + if return_annotation not in (inspect._empty, pd.DataFrame): + raise TypeError( + f"return signature for {user_function} is {return_annotation} but should be pd.DataFrame" + ) + transformation = PandasTransformation(user_function, udf_string) + elif mode == "python": + if return_annotation not in (inspect._empty, Dict[str, Any]): + raise TypeError( + f"return signature for {user_function} is {return_annotation} but should be Dict[str, Any]" + ) + transformation = PythonTransformation(user_function, udf_string) + elif mode == "substrait": + pass on_demand_feature_view_obj = OnDemandFeatureView( name=user_function.__name__, sources=sources, schema=schema, feature_transformation=transformation, + mode=mode, description=description, tags=tags, owner=owner, @@ -578,3 +756,8 @@ def feature_view_to_batch_feature_view(fv: FeatureView) -> BatchFeatureView: bfv.features = copy.copy(fv.features) bfv.entities = copy.copy(fv.entities) return bfv + + +def _empty_odfv_udf_fn(x: Any) -> Any: + # just an identity mapping, otherwise we risk tripping some downstream tests + return x diff --git a/sdk/python/feast/transformation/pandas_transformation.py b/sdk/python/feast/transformation/pandas_transformation.py index 76f17e2106..1838a882f2 100644 --- a/sdk/python/feast/transformation/pandas_transformation.py +++ b/sdk/python/feast/transformation/pandas_transformation.py @@ -11,7 +11,7 @@ class PandasTransformation: def __init__(self, udf: FunctionType, udf_string: str = ""): """ - Creates an OnDemandPandasTransformation object. + Creates an PandasTransformation object. Args: udf: The user defined transformation function, which must take pandas @@ -21,8 +21,17 @@ def __init__(self, udf: FunctionType, udf_string: str = ""): self.udf = udf self.udf_string = udf_string - def transform(self, df: pd.DataFrame) -> pd.DataFrame: - return self.udf.__call__(df) + def transform(self, input_df: pd.DataFrame) -> pd.DataFrame: + if not isinstance(input_df, pd.DataFrame): + raise TypeError( + f"input_df should be type pd.DataFrame but got {type(input_df).__name__}" + ) + output_df = self.udf.__call__(input_df) + if not isinstance(output_df, pd.DataFrame): + raise TypeError( + f"output_df should be type pd.DataFrame but got {type(output_df).__name__}" + ) + return output_df def __eq__(self, other): if not isinstance(other, PandasTransformation): diff --git a/sdk/python/feast/transformation/python_transformation.py b/sdk/python/feast/transformation/python_transformation.py new file mode 100644 index 0000000000..9519f23c05 --- /dev/null +++ b/sdk/python/feast/transformation/python_transformation.py @@ -0,0 +1,65 @@ +from types import FunctionType +from typing import Dict + +import dill + +from feast.protos.feast.core.Transformation_pb2 import ( + UserDefinedFunctionV2 as UserDefinedFunctionProto, +) + + +class PythonTransformation: + def __init__(self, udf: FunctionType, udf_string: str = ""): + """ + Creates an PythonTransformation object. + Args: + udf: The user defined transformation function, which must take pandas + dataframes as inputs. + udf_string: The source code version of the udf (for diffing and displaying in Web UI) + """ + self.udf = udf + self.udf_string = udf_string + + def transform(self, input_dict: Dict) -> Dict: + if not isinstance(input_dict, Dict): + raise TypeError( + f"input_dict should be type Dict[str, Any] but got {type(input_dict).__name__}" + ) + # Ensuring that the inputs are included as well + output_dict = self.udf.__call__(input_dict) + if not isinstance(output_dict, Dict): + raise TypeError( + f"output_dict should be type Dict[str, Any] but got {type(output_dict).__name__}" + ) + return {**input_dict, **output_dict} + + def __eq__(self, other): + if not isinstance(other, PythonTransformation): + raise TypeError( + "Comparisons should only involve PythonTransformation class objects." + ) + + if not super().__eq__(other): + return False + + if ( + self.udf_string != other.udf_string + or self.udf.__code__.co_code != other.udf.__code__.co_code + ): + return False + + return True + + def to_proto(self) -> UserDefinedFunctionProto: + return UserDefinedFunctionProto( + name=self.udf.__name__, + body=dill.dumps(self.udf, recurse=True), + body_text=self.udf_string, + ) + + @classmethod + def from_proto(cls, user_defined_function_proto: UserDefinedFunctionProto): + return PythonTransformation( + udf=dill.loads(user_defined_function_proto.body), + udf_string=user_defined_function_proto.body_text, + ) diff --git a/sdk/python/feast/transformation_server.py b/sdk/python/feast/transformation_server.py index 83f4af749e..34fe3eac76 100644 --- a/sdk/python/feast/transformation_server.py +++ b/sdk/python/feast/transformation_server.py @@ -47,6 +47,11 @@ def TransformFeatures(self, request, context): df = pa.ipc.open_file(request.transformation_input.arrow_value).read_pandas() + if odfv.mode != "pandas": + raise Exception( + f'OnDemandFeatureView mode "{odfv.mode}" not supported by TransformationServer.' + ) + result_df = odfv.get_transformed_features_df(df, True) result_arrow = pa.Table.from_pandas(result_df) sink = pa.BufferOutputStream() diff --git a/sdk/python/tests/unit/infra/test_inference_unit_tests.py b/sdk/python/tests/unit/infra/test_inference_unit_tests.py index a108d397bd..be97a838bd 100644 --- a/sdk/python/tests/unit/infra/test_inference_unit_tests.py +++ b/sdk/python/tests/unit/infra/test_inference_unit_tests.py @@ -1,3 +1,5 @@ +from typing import Any, Dict + import pandas as pd import pytest @@ -51,7 +53,7 @@ def test_infer_datasource_names_dwh(): data_source = dwh_class(query="test_query") -def test_on_demand_features_type_inference(): +def test_on_demand_features_valid_type_inference(): # Create Feature Views date_request = RequestSource( name="date_request", @@ -73,6 +75,31 @@ def test_view(features_df: pd.DataFrame) -> pd.DataFrame: test_view.infer_features() + @on_demand_feature_view( + sources=[date_request], + schema=[ + Field(name="output", dtype=UnixTimestamp), + Field(name="object_output", dtype=String), + ], + mode="python", + ) + def python_native_test_view(input_dict: Dict[str, Any]) -> Dict[str, Any]: + output_dict: Dict[str, Any] = { + "output": input_dict["some_date"], + "object_output": str(input_dict["some_date"]), + } + return output_dict + + python_native_test_view.infer_features() + + +def test_on_demand_features_invalid_type_inference(): + # Create Feature Views + date_request = RequestSource( + name="date_request", + schema=[Field(name="some_date", dtype=UnixTimestamp)], + ) + @on_demand_feature_view( sources=[date_request], schema=[ @@ -96,13 +123,49 @@ def invalid_test_view(features_df: pd.DataFrame) -> pd.DataFrame: ], sources=[date_request], ) - def test_view_with_missing_feature(features_df: pd.DataFrame) -> pd.DataFrame: + def view_with_missing_feature(features_df: pd.DataFrame) -> pd.DataFrame: data = pd.DataFrame() data["output"] = features_df["some_date"] return data with pytest.raises(SpecifiedFeaturesNotPresentError): - test_view_with_missing_feature.infer_features() + view_with_missing_feature.infer_features() + + with pytest.raises(TypeError): + + @on_demand_feature_view( + sources=[date_request], + schema=[ + Field(name="output", dtype=UnixTimestamp), + Field(name="object_output", dtype=String), + ], + mode="pandas", + ) + def python_native_test_invalid_pandas_view( + input_dict: Dict[str, Any] + ) -> Dict[str, Any]: + output_dict: Dict[str, Any] = { + "output": input_dict["some_date"], + "object_output": str(input_dict["some_date"]), + } + return output_dict + + with pytest.raises(TypeError): + + @on_demand_feature_view( + sources=[date_request], + schema=[ + Field(name="output", dtype=UnixTimestamp), + Field(name="object_output", dtype=String), + ], + mode="python", + ) + def python_native_test_invalid_dict_view( + features_df: pd.DataFrame, + ) -> pd.DataFrame: + data = pd.DataFrame() + data["output"] = features_df["some_date"] + return data def test_datasource_inference(): diff --git a/sdk/python/tests/unit/test_on_demand_feature_view.py b/sdk/python/tests/unit/test_on_demand_feature_view.py index d561bd8e84..02e013e775 100644 --- a/sdk/python/tests/unit/test_on_demand_feature_view.py +++ b/sdk/python/tests/unit/test_on_demand_feature_view.py @@ -12,13 +12,19 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import Any, Dict, List + import pandas as pd import pytest from feast.feature_view import FeatureView from feast.field import Field from feast.infra.offline_stores.file_source import FileSource -from feast.on_demand_feature_view import OnDemandFeatureView, PandasTransformation +from feast.on_demand_feature_view import ( + OnDemandFeatureView, + PandasTransformation, + PythonTransformation, +) from feast.types import Float32 @@ -32,10 +38,18 @@ def udf1(features_df: pd.DataFrame) -> pd.DataFrame: def udf2(features_df: pd.DataFrame) -> pd.DataFrame: df = pd.DataFrame() df["output1"] = features_df["feature1"] + 100 - df["output2"] = features_df["feature2"] + 100 + df["output2"] = features_df["feature2"] + 101 return df +def python_native_udf(features_dict: Dict[str, List[Any]]) -> Dict[str, Any]: + output_dict: Dict[str, List[Any]] = { + "output1": features_dict["feature1"] + 100, + "output2": features_dict["feature2"] + 101, + } + return output_dict + + @pytest.mark.filterwarnings("ignore:udf and udf_string parameters are deprecated") def test_hash(): file_source = FileSource(name="my-file-source", path="test.parquet") @@ -101,8 +115,9 @@ def test_hash(): Field(name="output1", dtype=Float32), Field(name="output2", dtype=Float32), ], - udf=udf2, - udf_string="udf2 source code", + feature_transformation=PandasTransformation( + udf=udf2, udf_string="udf2 source code" + ), description="test", ) @@ -128,6 +143,75 @@ def test_hash(): ) +def test_python_native_transformation_mode(): + file_source = FileSource(name="my-file-source", path="test.parquet") + feature_view = FeatureView( + name="my-feature-view", + entities=[], + schema=[ + Field(name="feature1", dtype=Float32), + Field(name="feature2", dtype=Float32), + ], + source=file_source, + ) + sources = [feature_view] + + on_demand_feature_view_python_native = OnDemandFeatureView( + name="my-on-demand-feature-view", + sources=sources, + schema=[ + Field(name="output1", dtype=Float32), + Field(name="output2", dtype=Float32), + ], + feature_transformation=PythonTransformation( + udf=python_native_udf, udf_string="python native udf source code" + ), + description="test", + mode="python", + ) + + on_demand_feature_view_python_native_err = OnDemandFeatureView( + name="my-on-demand-feature-view", + sources=sources, + schema=[ + Field(name="output1", dtype=Float32), + Field(name="output2", dtype=Float32), + ], + feature_transformation=PandasTransformation( + udf=python_native_udf, udf_string="python native udf source code" + ), + description="test", + mode="python", + ) + + assert ( + on_demand_feature_view_python_native.feature_transformation + == PythonTransformation(python_native_udf, "python native udf source code") + ) + + with pytest.raises(TypeError): + assert ( + on_demand_feature_view_python_native_err.feature_transformation + == PythonTransformation(python_native_udf, "python native udf source code") + ) + + with pytest.raises(TypeError): + # This should fail + on_demand_feature_view_python_native_err.feature_transformation.transform( + { + "feature1": 0, + "feature2": 1, + } + ) + + assert on_demand_feature_view_python_native.get_transformed_features( + { + "feature1": 0, + "feature2": 1, + } + ) == {"feature1": 0, "feature2": 1, "output1": 100, "output2": 102} + + @pytest.mark.filterwarnings("ignore:udf and udf_string parameters are deprecated") def test_from_proto_backwards_compatible_udf(): file_source = FileSource(name="my-file-source", path="test.parquet") diff --git a/sdk/python/tests/unit/test_on_demand_substrait_transformation.py b/sdk/python/tests/unit/test_on_demand_substrait_transformation.py index c9d30c5b7a..378aa7ce3b 100644 --- a/sdk/python/tests/unit/test_on_demand_substrait_transformation.py +++ b/sdk/python/tests/unit/test_on_demand_substrait_transformation.py @@ -71,6 +71,7 @@ def pandas_view(inputs: pd.DataFrame) -> pd.DataFrame: @on_demand_feature_view( sources=[driver_stats_fv[["conv_rate", "acc_rate"]]], schema=[Field(name="conv_rate_plus_acc_substrait", dtype=Float64)], + mode="substrait", ) def substrait_view(inputs: Table) -> Table: return inputs.select(