diff --git a/common/persistence/visibility/manager/visibility_manager.go b/common/persistence/visibility/manager/visibility_manager.go index c89332afb3f..a7cd0abb9f0 100644 --- a/common/persistence/visibility/manager/visibility_manager.go +++ b/common/persistence/visibility/manager/visibility_manager.go @@ -47,6 +47,7 @@ type ( GetStoreNames() []string HasStoreName(stName string) bool GetIndexName() string + ValidateCustomSearchAttributes(searchAttributes map[string]any) (map[string]any, error) // Write APIs. RecordWorkflowExecutionStarted(ctx context.Context, request *RecordWorkflowExecutionStartedRequest) error diff --git a/common/persistence/visibility/manager/visibility_manager_mock.go b/common/persistence/visibility/manager/visibility_manager_mock.go index 49e2163f696..245af95e00f 100644 --- a/common/persistence/visibility/manager/visibility_manager_mock.go +++ b/common/persistence/visibility/manager/visibility_manager_mock.go @@ -347,3 +347,18 @@ func (mr *MockVisibilityManagerMockRecorder) UpsertWorkflowExecution(ctx, reques mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpsertWorkflowExecution", reflect.TypeOf((*MockVisibilityManager)(nil).UpsertWorkflowExecution), ctx, request) } + +// ValidateCustomSearchAttributes mocks base method. +func (m *MockVisibilityManager) ValidateCustomSearchAttributes(searchAttributes map[string]any) (map[string]any, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ValidateCustomSearchAttributes", searchAttributes) + ret0, _ := ret[0].(map[string]any) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ValidateCustomSearchAttributes indicates an expected call of ValidateCustomSearchAttributes. +func (mr *MockVisibilityManagerMockRecorder) ValidateCustomSearchAttributes(searchAttributes interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ValidateCustomSearchAttributes", reflect.TypeOf((*MockVisibilityManager)(nil).ValidateCustomSearchAttributes), searchAttributes) +} diff --git a/common/persistence/visibility/store/elasticsearch/visibility_store.go b/common/persistence/visibility/store/elasticsearch/visibility_store.go index 412d9a64fcb..cb9a139fd79 100644 --- a/common/persistence/visibility/store/elasticsearch/visibility_store.go +++ b/common/persistence/visibility/store/elasticsearch/visibility_store.go @@ -95,6 +95,10 @@ var _ store.VisibilityStore = (*visibilityStore)(nil) var ( errUnexpectedJSONFieldType = errors.New("unexpected JSON field type") + minTime = time.Unix(0, 0).UTC() + maxTime = time.Unix(0, math.MaxInt64).UTC() + maxStringLength = 32766 + // Default sorter uses the sorting order defined in the index template. // It is indirectly built so buildPaginationQuery can have access to // the fields names to build the page query from the token. @@ -164,6 +168,44 @@ func (s *visibilityStore) GetIndexName() string { return s.index } +func (s *visibilityStore) ValidateCustomSearchAttributes( + searchAttributes map[string]any, +) (map[string]any, error) { + validatedSearchAttributes := make(map[string]any, len(searchAttributes)) + var invalidValueErrs []error + for saName, saValue := range searchAttributes { + var err error + switch value := saValue.(type) { + case time.Time: + err = validateDatetime(value) + case []time.Time: + for _, item := range value { + if err = validateDatetime(item); err != nil { + break + } + } + case string: + err = validateString(value) + case []string: + for _, item := range value { + if err = validateString(item); err != nil { + break + } + } + } + if err != nil { + invalidValueErrs = append(invalidValueErrs, err) + continue + } + validatedSearchAttributes[saName] = saValue + } + var retError error + if len(invalidValueErrs) > 0 { + retError = store.NewVisibilityStoreInvalidValuesError(invalidValueErrs) + } + return validatedSearchAttributes, retError +} + func (s *visibilityStore) RecordWorkflowExecutionStarted( ctx context.Context, request *store.InternalRecordWorkflowExecutionStartedRequest, @@ -941,6 +983,14 @@ func (s *visibilityStore) generateESDoc(request *store.InternalVisibilityRequest s.metricsHandler.Counter(metrics.ElasticsearchDocumentGenerateFailuresCount.GetMetricName()).Record(1) return nil, serviceerror.NewInternal(fmt.Sprintf("Unable to decode search attributes: %v", err)) } + // This is to prevent existing tasks to fail indefinitely. + // If it's only invalid values error, then silently continue without them. + searchAttributes, err = s.ValidateCustomSearchAttributes(searchAttributes) + if err != nil { + if _, ok := err.(*store.VisibilityStoreInvalidValuesError); !ok { + return nil, err + } + } for saName, saValue := range searchAttributes { if saValue == nil { // If search attribute value is `nil`, it means that it shouldn't be added to the document. @@ -1321,3 +1371,25 @@ func parsePageTokenValue( )) } } + +func validateDatetime(value time.Time) error { + if value.Before(minTime) || value.After(maxTime) { + return serviceerror.NewInvalidArgument( + fmt.Sprintf("Date not supported in Elasticsearch: %v", value), + ) + } + return nil +} + +func validateString(value string) error { + if len(value) > maxStringLength { + return serviceerror.NewInvalidArgument( + fmt.Sprintf( + "Strings with more than %d bytes are not supported in Elasticsearch (got %s)", + maxStringLength, + value, + ), + ) + } + return nil +} diff --git a/common/persistence/visibility/store/errors.go b/common/persistence/visibility/store/errors.go index d779f842ba6..321aaa65b40 100644 --- a/common/persistence/visibility/store/errors.go +++ b/common/persistence/visibility/store/errors.go @@ -25,10 +25,33 @@ package store import ( + "strings" + "go.temporal.io/api/serviceerror" ) +type ( + VisibilityStoreInvalidValuesError struct { + errs []error + } +) + var ( // OperationNotSupportedErr is returned when visibility operation in not supported. OperationNotSupportedErr = serviceerror.NewInvalidArgument("Operation not supported. Please use on Elasticsearch") ) + +func (e *VisibilityStoreInvalidValuesError) Error() string { + var sb strings.Builder + sb.WriteString("Visibility store invalid values errors: ") + for _, err := range e.errs { + sb.WriteString("[") + sb.WriteString(err.Error()) + sb.WriteString("]") + } + return sb.String() +} + +func NewVisibilityStoreInvalidValuesError(errs []error) error { + return &VisibilityStoreInvalidValuesError{errs: errs} +} diff --git a/common/persistence/visibility/store/sql/visibility_store.go b/common/persistence/visibility/store/sql/visibility_store.go index c1378cf89b6..55a988c523e 100644 --- a/common/persistence/visibility/store/sql/visibility_store.go +++ b/common/persistence/visibility/store/sql/visibility_store.go @@ -92,6 +92,12 @@ func (s *VisibilityStore) GetIndexName() string { return s.sqlStore.GetDbName() } +func (s *VisibilityStore) ValidateCustomSearchAttributes( + searchAttributes map[string]any, +) (map[string]any, error) { + return searchAttributes, nil +} + func (s *VisibilityStore) RecordWorkflowExecutionStarted( ctx context.Context, request *store.InternalRecordWorkflowExecutionStartedRequest, @@ -514,6 +520,14 @@ func (s *VisibilityStore) prepareSearchAttributesForDb( if err != nil { return nil, err } + // This is to prevent existing tasks to fail indefinitely. + // If it's only invalid values error, then silently continue without them. + searchAttributes, err = s.ValidateCustomSearchAttributes(searchAttributes) + if err != nil { + if _, ok := err.(*store.VisibilityStoreInvalidValuesError); !ok { + return nil, err + } + } for name, value := range searchAttributes { if value == nil { diff --git a/common/persistence/visibility/store/standard/cassandra/visibility_store.go b/common/persistence/visibility/store/standard/cassandra/visibility_store.go index f817bf52e6e..570345b0fac 100644 --- a/common/persistence/visibility/store/standard/cassandra/visibility_store.go +++ b/common/persistence/visibility/store/standard/cassandra/visibility_store.go @@ -177,6 +177,12 @@ func (v *visibilityStore) GetIndexName() string { return "" } +func (v *visibilityStore) ValidateCustomSearchAttributes( + searchAttributes map[string]any, +) (map[string]any, error) { + return searchAttributes, nil +} + // Close releases the resources held by this object func (v *visibilityStore) Close() { v.session.Close() diff --git a/common/persistence/visibility/store/standard/sql/visibility_store.go b/common/persistence/visibility/store/standard/sql/visibility_store.go index 30e7edfdc6b..ec74f4fd7c0 100644 --- a/common/persistence/visibility/store/standard/sql/visibility_store.go +++ b/common/persistence/visibility/store/standard/sql/visibility_store.go @@ -86,6 +86,12 @@ func (s *visibilityStore) GetIndexName() string { return "" } +func (s *visibilityStore) ValidateCustomSearchAttributes( + searchAttributes map[string]any, +) (map[string]any, error) { + return searchAttributes, nil +} + func (s *visibilityStore) RecordWorkflowExecutionStarted( ctx context.Context, request *store.InternalRecordWorkflowExecutionStartedRequest, diff --git a/common/persistence/visibility/store/standard/visibility_store.go b/common/persistence/visibility/store/standard/visibility_store.go index ba21dcaaaf6..8169dc5dd2b 100644 --- a/common/persistence/visibility/store/standard/visibility_store.go +++ b/common/persistence/visibility/store/standard/visibility_store.go @@ -79,6 +79,12 @@ func (s *standardStore) GetIndexName() string { return "" } +func (s *standardStore) ValidateCustomSearchAttributes( + searchAttributes map[string]any, +) (map[string]any, error) { + return searchAttributes, nil +} + func (s *standardStore) RecordWorkflowExecutionStarted( ctx context.Context, request *store.InternalRecordWorkflowExecutionStartedRequest, diff --git a/common/persistence/visibility/store/visibility_store.go b/common/persistence/visibility/store/visibility_store.go index b2ae07171cf..bf98acf7dfd 100644 --- a/common/persistence/visibility/store/visibility_store.go +++ b/common/persistence/visibility/store/visibility_store.go @@ -45,6 +45,11 @@ type ( GetName() string GetIndexName() string + // Validate search attributes based on the store constraints. It returns a new map containing + // only search attributes with valid values. If there are invalid values, an error of type + // VisibilityStoreInvalidValuesError wraps all invalid values errors. + ValidateCustomSearchAttributes(searchAttributes map[string]any) (map[string]any, error) + // Write APIs. RecordWorkflowExecutionStarted(ctx context.Context, request *InternalRecordWorkflowExecutionStartedRequest) error RecordWorkflowExecutionClosed(ctx context.Context, request *InternalRecordWorkflowExecutionClosedRequest) error diff --git a/common/persistence/visibility/store/visibility_store_mock.go b/common/persistence/visibility/store/visibility_store_mock.go index 013a9b3e77e..5ceab4df693 100644 --- a/common/persistence/visibility/store/visibility_store_mock.go +++ b/common/persistence/visibility/store/visibility_store_mock.go @@ -319,3 +319,18 @@ func (mr *MockVisibilityStoreMockRecorder) UpsertWorkflowExecution(ctx, request mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpsertWorkflowExecution", reflect.TypeOf((*MockVisibilityStore)(nil).UpsertWorkflowExecution), ctx, request) } + +// ValidateCustomSearchAttributes mocks base method. +func (m *MockVisibilityStore) ValidateCustomSearchAttributes(searchAttributes map[string]any) (map[string]any, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ValidateCustomSearchAttributes", searchAttributes) + ret0, _ := ret[0].(map[string]any) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ValidateCustomSearchAttributes indicates an expected call of ValidateCustomSearchAttributes. +func (mr *MockVisibilityStoreMockRecorder) ValidateCustomSearchAttributes(searchAttributes interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ValidateCustomSearchAttributes", reflect.TypeOf((*MockVisibilityStore)(nil).ValidateCustomSearchAttributes), searchAttributes) +} diff --git a/common/persistence/visibility/visibility_manager_dual.go b/common/persistence/visibility/visibility_manager_dual.go index 1fb218b1673..f8cbd52f8b3 100644 --- a/common/persistence/visibility/visibility_manager_dual.go +++ b/common/persistence/visibility/visibility_manager_dual.go @@ -81,6 +81,22 @@ func (v *visibilityManagerDual) GetIndexName() string { return v.visibilityManager.GetIndexName() } +func (v *visibilityManagerDual) ValidateCustomSearchAttributes( + searchAttributes map[string]any, +) (map[string]any, error) { + ms, err := v.managerSelector.writeManagers() + if err != nil { + return nil, err + } + for _, m := range ms { + searchAttributes, err = m.ValidateCustomSearchAttributes(searchAttributes) + if err != nil { + return nil, err + } + } + return searchAttributes, nil +} + func (v *visibilityManagerDual) RecordWorkflowExecutionStarted( ctx context.Context, request *manager.RecordWorkflowExecutionStartedRequest, diff --git a/common/persistence/visibility/visibility_manager_impl.go b/common/persistence/visibility/visibility_manager_impl.go index 24c745671a9..f42f2d0b8e0 100644 --- a/common/persistence/visibility/visibility_manager_impl.go +++ b/common/persistence/visibility/visibility_manager_impl.go @@ -89,6 +89,12 @@ func (p *visibilityManagerImpl) GetIndexName() string { return p.store.GetIndexName() } +func (p *visibilityManagerImpl) ValidateCustomSearchAttributes( + searchAttributes map[string]any, +) (map[string]any, error) { + return p.store.ValidateCustomSearchAttributes(searchAttributes) +} + func (p *visibilityManagerImpl) RecordWorkflowExecutionStarted( ctx context.Context, request *manager.RecordWorkflowExecutionStartedRequest, diff --git a/common/persistence/visibility/visibility_manager_rate_limited.go b/common/persistence/visibility/visibility_manager_rate_limited.go index 62446259b36..2f41f1844c5 100644 --- a/common/persistence/visibility/visibility_manager_rate_limited.go +++ b/common/persistence/visibility/visibility_manager_rate_limited.go @@ -80,6 +80,12 @@ func (m *visibilityManagerRateLimited) GetIndexName() string { return m.delegate.GetIndexName() } +func (m *visibilityManagerRateLimited) ValidateCustomSearchAttributes( + searchAttributes map[string]any, +) (map[string]any, error) { + return m.delegate.ValidateCustomSearchAttributes(searchAttributes) +} + // Below are write APIs. func (m *visibilityManagerRateLimited) RecordWorkflowExecutionStarted( diff --git a/common/persistence/visibility/visiblity_manager_metrics.go b/common/persistence/visibility/visiblity_manager_metrics.go index d9ce840de90..65c7c0cfd60 100644 --- a/common/persistence/visibility/visiblity_manager_metrics.go +++ b/common/persistence/visibility/visiblity_manager_metrics.go @@ -82,6 +82,12 @@ func (m *visibilityManagerMetrics) GetIndexName() string { return m.delegate.GetIndexName() } +func (m *visibilityManagerMetrics) ValidateCustomSearchAttributes( + searchAttributes map[string]any, +) (map[string]any, error) { + return m.delegate.ValidateCustomSearchAttributes(searchAttributes) +} + func (m *visibilityManagerMetrics) RecordWorkflowExecutionStarted( ctx context.Context, request *manager.RecordWorkflowExecutionStartedRequest, diff --git a/common/searchattribute/validator.go b/common/searchattribute/validator.go index c3a297a19cd..8aaf0025232 100644 --- a/common/searchattribute/validator.go +++ b/common/searchattribute/validator.go @@ -34,6 +34,7 @@ import ( "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/payload" + "go.temporal.io/server/common/persistence/visibility/manager" ) type ( @@ -44,7 +45,7 @@ type ( searchAttributesNumberOfKeysLimit dynamicconfig.IntPropertyFnWithNamespaceFilter searchAttributesSizeOfValueLimit dynamicconfig.IntPropertyFnWithNamespaceFilter searchAttributesTotalSizeLimit dynamicconfig.IntPropertyFnWithNamespaceFilter - indexName string + visibilityManager manager.VisibilityManager // allowList allows list of values when it's not keyword list type. allowList bool @@ -58,7 +59,7 @@ func NewValidator( searchAttributesNumberOfKeysLimit dynamicconfig.IntPropertyFnWithNamespaceFilter, searchAttributesSizeOfValueLimit dynamicconfig.IntPropertyFnWithNamespaceFilter, searchAttributesTotalSizeLimit dynamicconfig.IntPropertyFnWithNamespaceFilter, - indexName string, + visibilityManager manager.VisibilityManager, allowList bool, ) *Validator { return &Validator{ @@ -67,7 +68,7 @@ func NewValidator( searchAttributesNumberOfKeysLimit: searchAttributesNumberOfKeysLimit, searchAttributesSizeOfValueLimit: searchAttributesSizeOfValueLimit, searchAttributesTotalSizeLimit: searchAttributesTotalSizeLimit, - indexName: indexName, + visibilityManager: visibilityManager, allowList: allowList, } } @@ -90,13 +91,17 @@ func (v *Validator) Validate(searchAttributes *commonpb.SearchAttributes, namesp ) } - saTypeMap, err := v.searchAttributesProvider.GetSearchAttributes(v.indexName, false) + saTypeMap, err := v.searchAttributesProvider.GetSearchAttributes( + v.visibilityManager.GetIndexName(), + false, + ) if err != nil { return serviceerror.NewInvalidArgument( fmt.Sprintf("unable to get search attributes from cluster metadata: %v", err), ) } + saMap := make(map[string]any, len(searchAttributes.GetIndexedFields())) for saFieldName, saPayload := range searchAttributes.GetIndexedFields() { // user search attribute cannot be a system search attribute if _, err = saTypeMap.getType(saFieldName, systemCategory); err == nil { @@ -121,7 +126,7 @@ func (v *Validator) Validate(searchAttributes *commonpb.SearchAttributes, namesp ) } - _, err = DecodeValue(saPayload, saType, v.allowList) + saValue, err := DecodeValue(saPayload, saType, v.allowList) if err != nil { var invalidValue interface{} if err = payload.Decode(saPayload, &invalidValue); err != nil { @@ -138,8 +143,10 @@ func (v *Validator) Validate(searchAttributes *commonpb.SearchAttributes, namesp namespace, ) } + saMap[saFieldName] = saValue } - return nil + _, err = v.visibilityManager.ValidateCustomSearchAttributes(saMap) + return err } // ValidateSize validate search attributes are valid for writing and not exceed limits. diff --git a/common/searchattribute/validator_test.go b/common/searchattribute/validator_test.go index 71ad45546b7..e95930041df 100644 --- a/common/searchattribute/validator_test.go +++ b/common/searchattribute/validator_test.go @@ -27,19 +27,35 @@ package searchattribute import ( "testing" + "github.com/golang/mock/gomock" "github.com/stretchr/testify/suite" commonpb "go.temporal.io/api/common/v1" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/payload" + "go.temporal.io/server/common/persistence/visibility/manager" ) type searchAttributesValidatorSuite struct { suite.Suite + + mockVisibilityManager *manager.MockVisibilityManager } func TestSearchAttributesValidatorSuite(t *testing.T) { - s := &searchAttributesValidatorSuite{} + ctrl := gomock.NewController(t) + s := &searchAttributesValidatorSuite{ + mockVisibilityManager: manager.NewMockVisibilityManager(ctrl), + } + s.mockVisibilityManager.EXPECT().GetIndexName().Return("").AnyTimes() + s.mockVisibilityManager.EXPECT(). + ValidateCustomSearchAttributes(gomock.Any()). + DoAndReturn( + func(searchAttributes map[string]any) (map[string]any, error) { + return searchAttributes, nil + }, + ). + AnyTimes() suite.Run(t, s) } @@ -54,7 +70,7 @@ func (s *searchAttributesValidatorSuite) TestSearchAttributesValidate() { dynamicconfig.GetIntPropertyFilteredByNamespace(numOfKeysLimit), dynamicconfig.GetIntPropertyFilteredByNamespace(sizeOfValueLimit), dynamicconfig.GetIntPropertyFilteredByNamespace(sizeOfTotalLimit), - "", + s.mockVisibilityManager, true, ) @@ -131,7 +147,7 @@ func (s *searchAttributesValidatorSuite) TestSearchAttributesValidate_Mapper() { dynamicconfig.GetIntPropertyFilteredByNamespace(numOfKeysLimit), dynamicconfig.GetIntPropertyFilteredByNamespace(sizeOfValueLimit), dynamicconfig.GetIntPropertyFilteredByNamespace(sizeOfTotalLimit), - "", + s.mockVisibilityManager, false, ) @@ -194,7 +210,7 @@ func (s *searchAttributesValidatorSuite) TestSearchAttributesValidateSize() { dynamicconfig.GetIntPropertyFilteredByNamespace(numOfKeysLimit), dynamicconfig.GetIntPropertyFilteredByNamespace(sizeOfValueLimit), dynamicconfig.GetIntPropertyFilteredByNamespace(sizeOfTotalLimit), - "", + s.mockVisibilityManager, false, ) @@ -233,7 +249,7 @@ func (s *searchAttributesValidatorSuite) TestSearchAttributesValidateSize_Mapper dynamicconfig.GetIntPropertyFilteredByNamespace(numOfKeysLimit), dynamicconfig.GetIntPropertyFilteredByNamespace(sizeOfValueLimit), dynamicconfig.GetIntPropertyFilteredByNamespace(sizeOfTotalLimit), - "", + s.mockVisibilityManager, false, ) diff --git a/service/frontend/workflow_handler.go b/service/frontend/workflow_handler.go index d6dbe662796..c8d9c247a2e 100644 --- a/service/frontend/workflow_handler.go +++ b/service/frontend/workflow_handler.go @@ -190,7 +190,7 @@ func NewWorkflowHandler( config.SearchAttributesNumberOfKeysLimit, config.SearchAttributesSizeOfValueLimit, config.SearchAttributesTotalSizeLimit, - visibilityMrg.GetIndexName(), + visibilityMrg, visibility.AllowListForValidation(visibilityMrg.GetStoreNames()), ), archivalMetadata: archivalMetadata, diff --git a/service/history/commandChecker_test.go b/service/history/commandChecker_test.go index 3792afd9377..79770deea01 100644 --- a/service/history/commandChecker_test.go +++ b/service/history/commandChecker_test.go @@ -49,6 +49,7 @@ import ( "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/payloads" + "go.temporal.io/server/common/persistence/visibility/manager" "go.temporal.io/server/common/primitives/timestamp" "go.temporal.io/server/common/searchattribute" "go.temporal.io/server/service/history/configs" @@ -83,8 +84,9 @@ type ( suite.Suite *require.Assertions - controller *gomock.Controller - mockNamespaceCache *namespace.MockRegistry + controller *gomock.Controller + mockNamespaceCache *namespace.MockRegistry + mockVisibilityManager *manager.MockVisibilityManager validator *commandAttrValidator @@ -111,6 +113,18 @@ func (s *commandAttrValidatorSuite) SetupTest() { s.controller = gomock.NewController(s.T()) s.mockNamespaceCache = namespace.NewMockRegistry(s.controller) + + s.mockVisibilityManager = manager.NewMockVisibilityManager(s.controller) + s.mockVisibilityManager.EXPECT().GetIndexName().Return("index-name").AnyTimes() + s.mockVisibilityManager.EXPECT(). + ValidateCustomSearchAttributes(gomock.Any()). + DoAndReturn( + func(searchAttributes map[string]any) (map[string]any, error) { + return searchAttributes, nil + }, + ). + AnyTimes() + config := &configs.Config{ MaxIDLengthLimit: dynamicconfig.GetIntPropertyFn(1000), SearchAttributesNumberOfKeysLimit: dynamicconfig.GetIntPropertyFilteredByNamespace(100), @@ -130,7 +144,7 @@ func (s *commandAttrValidatorSuite) SetupTest() { config.SearchAttributesNumberOfKeysLimit, config.SearchAttributesSizeOfValueLimit, config.SearchAttributesTotalSizeLimit, - "index-name", + s.mockVisibilityManager, false, )) } diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 5d80b193d0f..d97604b52a3 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -233,7 +233,7 @@ func NewEngineWithShardContext( config.SearchAttributesNumberOfKeysLimit, config.SearchAttributesSizeOfValueLimit, config.SearchAttributesTotalSizeLimit, - persistenceVisibilityMgr.GetIndexName(), + persistenceVisibilityMgr, visibility.AllowListForValidation(persistenceVisibilityMgr.GetStoreNames()), ) diff --git a/service/history/historyEngine2_test.go b/service/history/historyEngine2_test.go index b36df0a8c82..add000f6e94 100644 --- a/service/history/historyEngine2_test.go +++ b/service/history/historyEngine2_test.go @@ -52,6 +52,7 @@ import ( persistencespb "go.temporal.io/server/api/persistence/v1" workflowspb "go.temporal.io/server/api/workflow/v1" "go.temporal.io/server/common/persistence/versionhistory" + "go.temporal.io/server/common/persistence/visibility/manager" "go.temporal.io/server/common/primitives/timestamp" serviceerrors "go.temporal.io/server/common/serviceerror" "go.temporal.io/server/service/history/api" @@ -92,6 +93,7 @@ type ( mockEventsCache *events.MockCache mockNamespaceCache *namespace.MockRegistry mockClusterMetadata *cluster.MockMetadata + mockVisibilityManager *manager.MockVisibilityManager workflowCache wcache.Cache historyEngine *historyEngineImpl @@ -151,6 +153,7 @@ func (s *engine2Suite) SetupTest() { s.mockNamespaceCache = s.mockShard.Resource.NamespaceCache s.mockExecutionMgr = s.mockShard.Resource.ExecutionMgr s.mockClusterMetadata = s.mockShard.Resource.ClusterMetadata + s.mockVisibilityManager = s.mockShard.Resource.VisibilityManager s.mockEventsCache = s.mockShard.MockEventsCache s.mockNamespaceCache.EXPECT().GetNamespaceByID(tests.NamespaceID).Return(tests.GlobalNamespaceEntry, nil).AnyTimes() @@ -163,6 +166,15 @@ func (s *engine2Suite) SetupTest() { s.mockClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes() s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(false, common.EmptyVersion).Return(cluster.TestCurrentClusterName).AnyTimes() s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(true, tests.Version).Return(cluster.TestCurrentClusterName).AnyTimes() + s.mockVisibilityManager.EXPECT().GetIndexName().Return("").AnyTimes() + s.mockVisibilityManager.EXPECT(). + ValidateCustomSearchAttributes(gomock.Any()). + DoAndReturn( + func(searchAttributes map[string]any) (map[string]any, error) { + return searchAttributes, nil + }, + ). + AnyTimes() s.workflowCache = wcache.NewCache(s.mockShard) s.logger = log.NewMockLogger(s.controller) s.logger.EXPECT().Debug(gomock.Any(), gomock.Any()).AnyTimes() @@ -198,7 +210,7 @@ func (s *engine2Suite) SetupTest() { s.config.SearchAttributesNumberOfKeysLimit, s.config.SearchAttributesSizeOfValueLimit, s.config.SearchAttributesTotalSizeLimit, - "", + s.mockVisibilityManager, false, ), workflowConsistencyChecker: api.NewWorkflowConsistencyChecker(mockShard, s.workflowCache), diff --git a/service/history/workflowTaskHandlerCallbacks_test.go b/service/history/workflowTaskHandlerCallbacks_test.go index 3f8e24ff84c..a6f6ab96b74 100644 --- a/service/history/workflowTaskHandlerCallbacks_test.go +++ b/service/history/workflowTaskHandlerCallbacks_test.go @@ -101,6 +101,17 @@ func (s *WorkflowTaskHandlerCallbackSuite) SetupTest() { mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(false, common.EmptyVersion).Return(cluster.TestCurrentClusterName).AnyTimes() mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(true, tests.Version).Return(cluster.TestCurrentClusterName).AnyTimes() + mockVisibilityManager := mockShard.Resource.VisibilityManager + mockVisibilityManager.EXPECT().GetIndexName().Return("").AnyTimes() + mockVisibilityManager.EXPECT(). + ValidateCustomSearchAttributes(gomock.Any()). + DoAndReturn( + func(searchAttributes map[string]any) (map[string]any, error) { + return searchAttributes, nil + }, + ). + AnyTimes() + s.mockEventsCache = mockShard.MockEventsCache s.mockEventsCache.EXPECT().PutEvent(gomock.Any(), gomock.Any()).AnyTimes() s.logger = mockShard.GetLogger() @@ -124,7 +135,7 @@ func (s *WorkflowTaskHandlerCallbackSuite) SetupTest() { config.SearchAttributesNumberOfKeysLimit, config.SearchAttributesSizeOfValueLimit, config.SearchAttributesTotalSizeLimit, - "", + mockShard.Resource.VisibilityManager, false, ), workflowConsistencyChecker: api.NewWorkflowConsistencyChecker(mockShard, workflowCache),