diff --git a/go/internal/feast/errors.go b/go/internal/feast/errors.go new file mode 100644 index 0000000000..f42b4aad82 --- /dev/null +++ b/go/internal/feast/errors.go @@ -0,0 +1,22 @@ +package feast + +import ( + "google.golang.org/genproto/googleapis/rpc/errdetails" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type FeastTransformationServiceNotConfigured struct{} + +func (FeastTransformationServiceNotConfigured) GRPCStatus() *status.Status { + errorStatus := status.New(codes.Internal, "No transformation service configured") + ds, err := errorStatus.WithDetails(&errdetails.LocalizedMessage{Message: "No transformation service configured, required for on-demand feature transformations"}) + if err != nil { + return errorStatus + } + return ds +} + +func (e FeastTransformationServiceNotConfigured) Error() string { + return e.GRPCStatus().Err().Error() +} diff --git a/go/internal/feast/featurestore.go b/go/internal/feast/featurestore.go index df4df7e199..abe1d195de 100644 --- a/go/internal/feast/featurestore.go +++ b/go/internal/feast/featurestore.go @@ -3,8 +3,9 @@ package feast import ( "context" "errors" - "fmt" + "github.com/apache/arrow/go/v17/arrow/memory" + //"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" "github.com/feast-dev/feast/go/internal/feast/model" @@ -60,17 +61,14 @@ func NewFeatureStore(config *registry.RepoConfig, callback transformation.Transf return nil, err } - // Use a scalable transformation service like Python Transformation Service. - // Assume the user will define the "transformation_service_endpoint" in the feature_store.yaml file - // under the "feature_server" section. - transformationServerEndpoint, ok := config.FeatureServer["transformation_service_endpoint"] - if !ok { - fmt.Println("Errors while reading transformation_service_endpoint info") - panic("No transformation service endpoint provided in the feature_store.yaml file.") + var transformationService *transformation.GrpcTransformationService + if transformationServerEndpoint, ok := config.FeatureServer["transformation_service_endpoint"]; ok { + // Use a scalable transformation service like Python Transformation Service. + // Assume the user will define the "transformation_service_endpoint" in the feature_store.yaml file + // under the "feature_server" section. + transformationService, _ = transformation.NewGrpcTransformationService(config, transformationServerEndpoint.(string)) } - transformationService, _ := transformation.NewGrpcTransformationService(config, transformationServerEndpoint.(string)) - return &FeatureStore{ config: config, registry: registry, @@ -112,6 +110,10 @@ func (fs *FeatureStore) GetOnlineFeatures( return nil, err } + if len(requestedOnDemandFeatureViews) > 0 && fs.transformationService == nil { + return nil, FeastTransformationServiceNotConfigured{} + } + entityNameToJoinKeyMap, expectedJoinKeysSet, err := onlineserving.GetEntityMaps(requestedFeatureViews, entities) if err != nil { return nil, err diff --git a/go/internal/feast/featurestore_test.go b/go/internal/feast/featurestore_test.go index f066b39df2..e1f908b906 100644 --- a/go/internal/feast/featurestore_test.go +++ b/go/internal/feast/featurestore_test.go @@ -2,124 +2,241 @@ package feast import ( "context" + "log" + "os" "path/filepath" "runtime" "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" "github.com/feast-dev/feast/go/internal/feast/onlinestore" "github.com/feast-dev/feast/go/internal/feast/registry" + "github.com/feast-dev/feast/go/internal/test" + "github.com/feast-dev/feast/go/protos/feast/serving" "github.com/feast-dev/feast/go/protos/feast/types" ) -// Return absolute path to the test_repo registry regardless of the working directory -func getRegistryPath() map[string]interface{} { +var featureRepoBasePath string +var featureRepoRegistryFile string + +func TestMain(m *testing.M) { // Get the file path of this source file, regardless of the working directory _, filename, _, ok := runtime.Caller(0) if !ok { - panic("couldn't find file path of the test file") + log.Print("couldn't find file path of the test file") + os.Exit(1) } - registry := map[string]interface{}{ - "path": filepath.Join(filename, "..", "..", "..", "feature_repo/data/registry.db"), + featureRepoBasePath = filepath.Join(filename, "..", "..", "test") + featureRepoRegistryFile = filepath.Join(featureRepoBasePath, "feature_repo", "data", "registry.db") + if err := test.SetupInitializedRepo(featureRepoBasePath); err != nil { + log.Print("Could not initialize test repo: ", err) + os.Exit(1) } - return registry + os.Exit(m.Run()) } func TestNewFeatureStore(t *testing.T) { - t.Skip("@todo(achals): feature_repo isn't checked in yet") - config := registry.RepoConfig{ - Project: "feature_repo", - Registry: getRegistryPath(), - Provider: "local", - OnlineStore: map[string]interface{}{ - "type": "redis", - }, - } - fs, err := NewFeatureStore(&config, nil) - assert.Nil(t, err) - assert.IsType(t, &onlinestore.RedisOnlineStore{}, fs.onlineStore) - - t.Run("valid config", func(t *testing.T) { - config := ®istry.RepoConfig{ - Project: "feature_repo", - Registry: getRegistryPath(), - Provider: "local", - OnlineStore: map[string]interface{}{ - "type": "redis", + tests := []struct { + name string + config *registry.RepoConfig + expectOnlineStoreType interface{} + errMessage string + }{ + { + name: "valid config", + config: ®istry.RepoConfig{ + Project: "feature_repo", + Registry: map[string]interface{}{ + "path": featureRepoRegistryFile, + }, + Provider: "local", + OnlineStore: map[string]interface{}{ + "type": "redis", + }, }, - FeatureServer: map[string]interface{}{ - "transformation_service_endpoint": "localhost:50051", + expectOnlineStoreType: &onlinestore.RedisOnlineStore{}, + }, + { + name: "valid config with transformation service endpoint", + config: ®istry.RepoConfig{ + Project: "feature_repo", + Registry: map[string]interface{}{ + "path": featureRepoRegistryFile, + }, + Provider: "local", + OnlineStore: map[string]interface{}{ + "type": "redis", + }, + FeatureServer: map[string]interface{}{ + "transformation_service_endpoint": "localhost:50051", + }, }, - } - fs, err := NewFeatureStore(config, nil) - assert.Nil(t, err) - assert.NotNil(t, fs) - assert.IsType(t, &onlinestore.RedisOnlineStore{}, fs.onlineStore) - assert.NotNil(t, fs.transformationService) - }) - - t.Run("missing transformation service endpoint", func(t *testing.T) { - config := ®istry.RepoConfig{ - Project: "feature_repo", - Registry: getRegistryPath(), - Provider: "local", - OnlineStore: map[string]interface{}{ - "type": "redis", + expectOnlineStoreType: &onlinestore.RedisOnlineStore{}, + }, + { + name: "invalid online store config", + config: ®istry.RepoConfig{ + Project: "feature_repo", + Registry: map[string]interface{}{ + "path": featureRepoRegistryFile, + }, + Provider: "local", + OnlineStore: map[string]interface{}{ + "type": "invalid_store", + }, }, - } - defer func() { - if r := recover(); r == nil { - t.Errorf("The code did not panic") + errMessage: "invalid_store online store type is currently not supported", + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + got, err := NewFeatureStore(test.config, nil) + if test.errMessage != "" { + assert.Nil(t, got) + require.Error(t, err) + assert.ErrorContains(t, err, test.errMessage) + + } else { + require.NoError(t, err) + assert.NotNil(t, got) + assert.IsType(t, test.expectOnlineStoreType, got.onlineStore) } - }() - NewFeatureStore(config, nil) - }) - - t.Run("invalid online store config", func(t *testing.T) { - config := ®istry.RepoConfig{ - Project: "feature_repo", - Registry: getRegistryPath(), - Provider: "local", - OnlineStore: map[string]interface{}{ - "type": "invalid_store", + }) + } + +} + +type MockRedis struct { + mock.Mock +} + +func (m *MockRedis) Destruct() {} +func (m *MockRedis) OnlineRead(ctx context.Context, entityKeys []*types.EntityKey, featureViewNames []string, featureNames []string) ([][]onlinestore.FeatureData, error) { + args := m.Called(ctx, entityKeys, featureViewNames, featureNames) + var fd [][]onlinestore.FeatureData + if args.Get(0) != nil { + fd = args.Get(0).([][]onlinestore.FeatureData) + } + return fd, args.Error(1) +} + +func TestGetOnlineFeatures(t *testing.T) { + tests := []struct { + name string + config *registry.RepoConfig + fn func(*testing.T, *FeatureStore) + }{ + { + name: "redis with simple features", + config: ®istry.RepoConfig{ + Project: "feature_repo", + Registry: map[string]interface{}{ + "path": featureRepoRegistryFile, + }, + Provider: "local", + OnlineStore: map[string]interface{}{ + "type": "redis", + "connection_string": "localhost:6379", + }, }, - FeatureServer: map[string]interface{}{ - "transformation_service_endpoint": "localhost:50051", + fn: testRedisSimpleFeatures, + }, + { + name: "redis with On-demand feature views, no transformation service endpoint", + config: ®istry.RepoConfig{ + Project: "feature_repo", + Registry: map[string]interface{}{ + "path": featureRepoRegistryFile, + }, + Provider: "local", + OnlineStore: map[string]interface{}{ + "type": "redis", + "connection_string": "localhost:6379", + }, }, - } - fs, err := NewFeatureStore(config, nil) - assert.NotNil(t, err) - assert.Nil(t, fs) - }) + fn: testRedisODFVNoTransformationService, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + + fs, err := NewFeatureStore(test.config, nil) + require.Nil(t, err) + fs.onlineStore = new(MockRedis) + test.fn(t, fs) + }) + + } } -func TestGetOnlineFeaturesRedis(t *testing.T) { - t.Skip("@todo(achals): feature_repo isn't checked in yet") - config := registry.RepoConfig{ - Project: "feature_repo", - Registry: getRegistryPath(), - Provider: "local", - OnlineStore: map[string]interface{}{ - "type": "redis", - "connection_string": "localhost:6379", +func testRedisSimpleFeatures(t *testing.T, fs *FeatureStore) { + + featureNames := []string{"driver_hourly_stats:conv_rate", + "driver_hourly_stats:acc_rate", + "driver_hourly_stats:avg_daily_trips", + } + entities := map[string]*types.RepeatedValue{"driver_id": {Val: []*types.Value{{Val: &types.Value_Int64Val{Int64Val: 1001}}, + {Val: &types.Value_Int64Val{Int64Val: 1002}}, + }}} + + results := [][]onlinestore.FeatureData{ + { + { + Reference: serving.FeatureReferenceV2{FeatureViewName: "driver_hourly_stats", FeatureName: "conv_rate"}, + Value: types.Value{Val: &types.Value_FloatVal{FloatVal: 12.0}}, + }, + { + Reference: serving.FeatureReferenceV2{FeatureViewName: "driver_hourly_stats", FeatureName: "acc_rate"}, + Value: types.Value{Val: &types.Value_FloatVal{FloatVal: 1.0}}, + }, + { + Reference: serving.FeatureReferenceV2{FeatureViewName: "driver_hourly_stats", FeatureName: "avg_daily_trips"}, + Value: types.Value{Val: &types.Value_Int64Val{Int64Val: 100}}, + }, + }, + { + + { + Reference: serving.FeatureReferenceV2{FeatureViewName: "driver_hourly_stats", FeatureName: "conv_rate"}, + Value: types.Value{Val: &types.Value_FloatVal{FloatVal: 24.0}}, + }, + { + Reference: serving.FeatureReferenceV2{FeatureViewName: "driver_hourly_stats", FeatureName: "acc_rate"}, + Value: types.Value{Val: &types.Value_FloatVal{FloatVal: 2.0}}, + }, + { + Reference: serving.FeatureReferenceV2{FeatureViewName: "driver_hourly_stats", FeatureName: "avg_daily_trips"}, + Value: types.Value{Val: &types.Value_Int64Val{Int64Val: 130}}, + }, }, } + ctx := context.Background() + mr := fs.onlineStore.(*MockRedis) + mr.On("OnlineRead", ctx, mock.Anything, mock.Anything, mock.Anything).Return(results, nil) + response, err := fs.GetOnlineFeatures(ctx, featureNames, nil, entities, map[string]*types.RepeatedValue{}, true) + require.Nil(t, err) + assert.Len(t, response, 4) // 3 Features + 1 entity = 4 columns (feature vectors) in response +} +func testRedisODFVNoTransformationService(t *testing.T, fs *FeatureStore) { featureNames := []string{"driver_hourly_stats:conv_rate", "driver_hourly_stats:acc_rate", "driver_hourly_stats:avg_daily_trips", + "transformed_conv_rate:conv_rate_plus_val1", } entities := map[string]*types.RepeatedValue{"driver_id": {Val: []*types.Value{{Val: &types.Value_Int64Val{Int64Val: 1001}}, {Val: &types.Value_Int64Val{Int64Val: 1002}}, {Val: &types.Value_Int64Val{Int64Val: 1003}}}}, } - fs, err := NewFeatureStore(&config, nil) - assert.Nil(t, err) ctx := context.Background() - response, err := fs.GetOnlineFeatures( - ctx, featureNames, nil, entities, map[string]*types.RepeatedValue{}, true) - assert.Nil(t, err) - assert.Len(t, response, 4) // 3 Features + 1 entity = 4 columns (feature vectors) in response + mr := fs.onlineStore.(*MockRedis) + mr.On("OnlineRead", ctx, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) + response, err := fs.GetOnlineFeatures(ctx, featureNames, nil, entities, map[string]*types.RepeatedValue{}, true) + assert.Nil(t, response) + assert.ErrorAs(t, err, &FeastTransformationServiceNotConfigured{}) + } diff --git a/go/internal/test/feature_repo/example.py b/go/internal/test/feature_repo/example.py index 7084361007..a814b58913 100644 --- a/go/internal/test/feature_repo/example.py +++ b/go/internal/test/feature_repo/example.py @@ -2,10 +2,12 @@ from datetime import timedelta -from feast import Entity, Feature, FeatureView, Field, FileSource, FeatureService +from feast import Entity, Feature, FeatureView, Field, FileSource, FeatureService, RequestSource from feast.feature_logging import LoggingConfig from feast.infra.offline_stores.file_source import FileLoggingDestination -from feast.types import Float32, Int64 +from feast.types import Float32, Float64, Int64, PrimitiveFeastType +from feast.on_demand_feature_view import on_demand_feature_view +import pandas as pd # Read data from parquet files. Parquet is convenient for local development mode. For # production, you can use your favorite DWH, such as BigQuery. See Feast documentation @@ -41,4 +43,32 @@ name="test_service", features=[driver_hourly_stats_view], logging_config=LoggingConfig(destination=FileLoggingDestination(path="")) -) \ No newline at end of file +) + + +# Define a request data source which encodes features / information only +# available at request time (e.g. part of the user initiated HTTP request) +input_request = RequestSource( + name="vals_to_add", + schema=[ + Field(name="val_to_add", dtype=PrimitiveFeastType.INT64), + Field(name="val_to_add_2", dtype=PrimitiveFeastType.INT64), + ] +) + +# Use the input data and feature view features to create new features +@on_demand_feature_view( + sources=[ + driver_hourly_stats_view, + input_request + ], + schema=[ + Field(name='conv_rate_plus_val1', dtype=Float64), + Field(name='conv_rate_plus_val2', dtype=Float64) + ] +) +def transformed_conv_rate(features_df: pd.DataFrame) -> pd.DataFrame: + df = pd.DataFrame() + df['conv_rate_plus_val1'] = (features_df['conv_rate'] + features_df['val_to_add']) + df['conv_rate_plus_val2'] = (features_df['conv_rate'] + features_df['val_to_add_2']) + return df