From 12655c46950ee8ef29baee10c14e3365c74fb106 Mon Sep 17 00:00:00 2001 From: Rodrigo Zhou Date: Mon, 24 Apr 2023 09:40:22 -0700 Subject: [PATCH] Refactor configs for visibility (#4095) New dynamic configs for visibility --- common/config/config.go | 3 + common/config/persistence.go | 156 ++++++++- common/dynamicconfig/collection.go | 5 + common/dynamicconfig/constants.go | 10 + .../visibility_persistence_suite_test.go | 7 +- common/persistence/visibility/defs.go | 86 +++++ common/persistence/visibility/factory.go | 297 ++++++++---------- .../visibility/manager_selector.go | 81 ++--- .../store/elasticsearch/client/config.go | 11 +- service/frontend/dcRedirectionPolicy_test.go | 2 +- service/frontend/fx.go | 14 +- .../frontend/redirection_interceptor_test.go | 1 + service/frontend/service.go | 34 +- service/frontend/workflow_handler_test.go | 26 +- service/history/configs/config.go | 35 +-- service/history/fx.go | 19 +- service/history/queue_factory_base_test.go | 1 + .../stream_receiver_monitor_test.go | 1 + service/history/tests/vars.go | 2 +- .../history/workflow/mutable_state_impl.go | 6 +- service/history/workflow/task_refresher.go | 11 +- service/worker/fx.go | 14 +- service/worker/service.go | 30 +- temporal/fx.go | 53 ++-- 24 files changed, 522 insertions(+), 383 deletions(-) diff --git a/common/config/config.go b/common/config/config.go index c6485990417..20b3fa36347 100644 --- a/common/config/config.go +++ b/common/config/config.go @@ -230,6 +230,9 @@ type ( DefaultStore string `yaml:"defaultStore" validate:"nonzero"` // VisibilityStore is the name of the datastore to be used for visibility records VisibilityStore string `yaml:"visibilityStore"` + // SecondaryVisibilityStore is the name of the secondary datastore to be used for visibility records + SecondaryVisibilityStore string `yaml:"secondaryVisibilityStore"` + // DEPRECATED: use VisibilityStore key instead of AdvancedVisibilityStore // AdvancedVisibilityStore is the name of the datastore to be used for visibility records AdvancedVisibilityStore string `yaml:"advancedVisibilityStore"` // NumHistoryShards is the desired number of history shards. This config doesn't diff --git a/common/config/persistence.go b/common/config/persistence.go index 31d07905dcd..1192e00c35c 100644 --- a/common/config/persistence.go +++ b/common/config/persistence.go @@ -31,6 +31,7 @@ import ( "strings" "github.com/gocql/gocql" + "go.temporal.io/server/common/persistence/visibility/store/elasticsearch/client" ) const ( @@ -54,6 +55,68 @@ func (c *Persistence) Validate() error { if c.VisibilityStore != "" { stores = append(stores, c.VisibilityStore) } + if c.SecondaryVisibilityStore != "" { + stores = append(stores, c.SecondaryVisibilityStore) + } + if c.AdvancedVisibilityStore != "" { + stores = append(stores, c.AdvancedVisibilityStore) + } + + // There are 3 config keys: + // - visibilityStore: can set any data store + // - secondaryVisibilityStore: can set any data store + // - advancedVisibilityStore: can only set elasticsearch data store + // If visibilityStore is set, then it's always the primary. + // If secondaryVisibilityStore is set, it's always the secondary. + // + // Valid dual visibility combinations (order: primary, secondary): + // - visibilityStore (standard), secondaryVisibilityStore (any) + // - visibilityStore (standard), advancedVisibilityStore (es) + // - visibilityStore (advanced sql), secondaryVisibilityStore (advanced sql) + // - visibilityStore (es), visibilityStore (es) [via elasticsearch.indices config] + // - advancedVisibilityStore (es), advancedVisibilityStore (es) [via elasticsearch.indices config] + // + // Invalid dual visibility combinations: + // - visibilityStore (advanced sql), secondaryVisibilityStore (standard, es) + // - visibilityStore (advanced sql), advancedVisibilityStore (es) + // - visibilityStore (es), secondaryVisibilityStore (any) + // - visibilityStore (es), advancedVisibilityStore (es) + // - advancedVisibilityStore (es), secondaryVisibilityStore (any) + // + // The validation for dual visibility pair (advanced sql, advanced sql) is in visibility factory + // due to circular dependency. This will be better after standard visibility is removed. + + if c.VisibilityStore == "" && c.AdvancedVisibilityStore == "" { + return errors.New("persistence config: visibilityStore must be specified") + } + if c.SecondaryVisibilityStore != "" && c.AdvancedVisibilityStore != "" { + return errors.New( + "persistence config: cannot specify both secondaryVisibilityStore and " + + "advancedVisibilityStore", + ) + } + if c.AdvancedVisibilityStore != "" && c.DataStores[c.AdvancedVisibilityStore].Elasticsearch == nil { + return fmt.Errorf( + "persistence config: advanced visibility datastore %q: missing elasticsearch config", + c.AdvancedVisibilityStore, + ) + } + if c.DataStores[c.VisibilityStore].Elasticsearch != nil && + (c.SecondaryVisibilityStore != "" || c.AdvancedVisibilityStore != "") { + return errors.New( + "persistence config: cannot set secondaryVisibilityStore or advancedVisibilityStore " + + "when visibilityStore is setting elasticsearch datastore", + ) + } + if c.DataStores[c.SecondaryVisibilityStore].Elasticsearch.GetSecondaryVisibilityIndex() != "" { + return fmt.Errorf( + "persistence config: secondary visibility datastore %q: elasticsearch config: "+ + "cannot set secondary_visibility", + c.SecondaryVisibilityStore, + ) + } + + cntEsConfigs := 0 for _, st := range stores { ds, ok := c.DataStores[st] if !ok { @@ -62,10 +125,17 @@ func (c *Persistence) Validate() error { if err := ds.Validate(); err != nil { return fmt.Errorf("persistence config: datastore %q: %s", st, err.Error()) } + if ds.Elasticsearch != nil { + cntEsConfigs++ + } } - if err := c.validateAdvancedVisibility(); err != nil { - return err + if cntEsConfigs > 1 { + return fmt.Errorf( + "persistence config: cannot have more than one Elasticsearch visibility store config " + + "(use `elasticsearch.indices.secondary_visibility` config key if you need to set a " + + "secondary Elasticsearch visibility store)", + ) } return nil @@ -76,34 +146,75 @@ func (c *Persistence) StandardVisibilityConfigExist() bool { return c.VisibilityStore != "" } +// SecondaryVisibilityConfigExist returns whether user specified secondaryVisibilityStore in config +func (c *Persistence) SecondaryVisibilityConfigExist() bool { + return c.SecondaryVisibilityStore != "" +} + // AdvancedVisibilityConfigExist returns whether user specified advancedVisibilityStore in config func (c *Persistence) AdvancedVisibilityConfigExist() bool { return c.AdvancedVisibilityStore != "" } func (c *Persistence) IsSQLVisibilityStore() bool { - return c.StandardVisibilityConfigExist() && c.DataStores[c.VisibilityStore].SQL != nil + return (c.StandardVisibilityConfigExist() && c.DataStores[c.VisibilityStore].SQL != nil) || + (c.SecondaryVisibilityConfigExist() && c.DataStores[c.SecondaryVisibilityStore].SQL != nil) } -func (c *Persistence) validateAdvancedVisibility() error { - if !c.StandardVisibilityConfigExist() && !c.AdvancedVisibilityConfigExist() { - return errors.New("persistence config: one of visibilityStore or advancedVisibilityStore must be specified") +func (c *Persistence) GetVisibilityStoreConfig() DataStore { + if c.VisibilityStore != "" { + return c.DataStores[c.VisibilityStore] } - - if !c.AdvancedVisibilityConfigExist() { - return nil + if c.AdvancedVisibilityStore != "" { + return c.DataStores[c.AdvancedVisibilityStore] } + // Based on validation above, this should never happen. + return DataStore{} +} - advancedVisibilityDataStore, ok := c.DataStores[c.AdvancedVisibilityStore] - if !ok { - return fmt.Errorf("persistence config: advanced visibility datastore %q: missing config", c.AdvancedVisibilityStore) +func (c *Persistence) GetSecondaryVisibilityStoreConfig() DataStore { + if c.SecondaryVisibilityStore != "" { + return c.DataStores[c.SecondaryVisibilityStore] } - - if err := advancedVisibilityDataStore.Elasticsearch.Validate(c.AdvancedVisibilityStore); err != nil { - return err + if c.VisibilityStore != "" { + if c.AdvancedVisibilityStore != "" { + return c.DataStores[c.AdvancedVisibilityStore] + } + ds := c.DataStores[c.VisibilityStore] + if ds.Elasticsearch != nil && ds.Elasticsearch.GetSecondaryVisibilityIndex() != "" { + esConfig := *ds.Elasticsearch + esConfig.Indices = map[string]string{ + client.VisibilityAppName: ds.Elasticsearch.GetSecondaryVisibilityIndex(), + } + ds.Elasticsearch = &esConfig + return ds + } + } + if c.AdvancedVisibilityStore != "" { + ds := c.DataStores[c.AdvancedVisibilityStore] + if ds.Elasticsearch != nil && ds.Elasticsearch.GetSecondaryVisibilityIndex() != "" { + esConfig := *ds.Elasticsearch + esConfig.Indices = map[string]string{ + client.VisibilityAppName: ds.Elasticsearch.GetSecondaryVisibilityIndex(), + } + ds.Elasticsearch = &esConfig + return ds + } } + return DataStore{} +} - return nil +func (ds *DataStore) GetIndexName() string { + switch { + case ds.SQL != nil: + return ds.SQL.DatabaseName + case ds.Cassandra != nil: + return ds.Cassandra.Keyspace + case ds.Elasticsearch != nil: + return ds.Elasticsearch.GetVisibilityIndex() + default: + return "" + } } // Validate validates the data store config @@ -118,8 +229,14 @@ func (ds *DataStore) Validate() error { if ds.CustomDataStoreConfig != nil { storeConfigCount++ } + if ds.Elasticsearch != nil { + storeConfigCount++ + } if storeConfigCount != 1 { - return errors.New("must provide config for one and only one for DataStore of cassandra or sql or custom stores") + return errors.New( + "must provide config for one and only one datastore: " + + "elasticsearch, cassandra, sql or custom store", + ) } if ds.SQL != nil && ds.SQL.TaskScanPartitions == 0 { @@ -130,6 +247,11 @@ func (ds *DataStore) Validate() error { return err } } + if ds.Elasticsearch != nil { + if err := ds.Elasticsearch.Validate(); err != nil { + return err + } + } return nil } diff --git a/common/dynamicconfig/collection.go b/common/dynamicconfig/collection.go index 3e8d18849a5..b1b8a564c75 100644 --- a/common/dynamicconfig/collection.go +++ b/common/dynamicconfig/collection.go @@ -406,6 +406,11 @@ func (c *Collection) GetTaskQueuePartitionsProperty(key Key) IntPropertyFnWithTa return c.GetIntPropertyFilteredByTaskQueueInfo(key, defaultNumTaskQueuePartitions) } +func (c *Collection) HasKey(key Key) bool { + cvs := c.client.GetValue(key) + return len(cvs) > 0 +} + func findMatch(cvs, defaultCVs []ConstrainedValue, precedence []Constraints) (any, error) { if len(cvs)+len(defaultCVs) == 0 { return nil, errKeyNotPresent diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 19ecca0e582..333a0abc997 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -38,6 +38,7 @@ const ( // keys for system + // DEPRECATED: the following block of configs are deprecated and replaced by the next block of configs // StandardVisibilityPersistenceMaxReadQPS is the max QPC system host can query standard visibility DB (SQL or Cassandra) for read. StandardVisibilityPersistenceMaxReadQPS = "system.standardVisibilityPersistenceMaxReadQPS" // StandardVisibilityPersistenceMaxWriteQPS is the max QPC system host can query standard visibility DB (SQL or Cassandra) for write. @@ -54,6 +55,15 @@ const ( EnableReadVisibilityFromES = "system.enableReadVisibilityFromES" // EnableReadFromSecondaryAdvancedVisibility is the config to enable read from secondary Elasticsearch EnableReadFromSecondaryAdvancedVisibility = "system.enableReadFromSecondaryAdvancedVisibility" + + // VisibilityPersistenceMaxReadQPS is the max QPC system host can query visibility DB for read. + VisibilityPersistenceMaxReadQPS = "system.visibilityPersistenceMaxReadQPS" + // VisibilityPersistenceMaxWriteQPS is the max QPC system host can query visibility DB for write. + VisibilityPersistenceMaxWriteQPS = "system.visibilityPersistenceMaxWriteQPS" + // EnableReadFromSecondaryVisibility is the config to enable read from secondary visibility + EnableReadFromSecondaryVisibility = "system.enableReadFromSecondaryVisibility" + // SecondaryVisibilityWritingMode is key for how to write to secondary visibility + SecondaryVisibilityWritingMode = "system.secondaryVisibilityWritingMode" // VisibilityDisableOrderByClause is the config to disable ORDERY BY clause for Elasticsearch VisibilityDisableOrderByClause = "system.visibilityDisableOrderByClause" diff --git a/common/persistence/tests/visibility_persistence_suite_test.go b/common/persistence/tests/visibility_persistence_suite_test.go index 25a340a2662..f80c24becfc 100644 --- a/common/persistence/tests/visibility_persistence_suite_test.go +++ b/common/persistence/tests/visibility_persistence_suite_test.go @@ -78,13 +78,18 @@ func (s *VisibilityPersistenceSuite) SetupSuite() { s.controller = gomock.NewController(s.T()) s.SearchAttributesProvider = searchattribute.NewTestProvider() s.SearchAttributesMapperProvider = searchattribute.NewTestMapperProvider(nil) - s.VisibilityMgr, err = visibility.NewStandardManager( + s.VisibilityMgr, err = visibility.NewManager( cfg, resolver.NewNoopResolver(), + nil, + nil, s.SearchAttributesProvider, s.SearchAttributesMapperProvider, dynamicconfig.GetIntPropertyFn(1000), dynamicconfig.GetIntPropertyFn(1000), + dynamicconfig.GetBoolPropertyFnFilteredByNamespace(false), + dynamicconfig.GetStringPropertyFn(visibility.SecondaryVisibilityWritingModeOff), + dynamicconfig.GetBoolPropertyFnFilteredByNamespace(false), metrics.NoopMetricsHandler, s.Logger, ) diff --git a/common/persistence/visibility/defs.go b/common/persistence/visibility/defs.go index 5d491de68ec..5fe23f595fc 100644 --- a/common/persistence/visibility/defs.go +++ b/common/persistence/visibility/defs.go @@ -25,6 +25,7 @@ package visibility import ( + "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/persistence/sql/sqlplugin/mysql" "go.temporal.io/server/common/persistence/sql/sqlplugin/postgresql" "go.temporal.io/server/common/persistence/sql/sqlplugin/sqlite" @@ -47,6 +48,91 @@ func DefaultAdvancedVisibilityWritingMode(advancedVisibilityConfigExist bool) st return SecondaryVisibilityWritingModeOff } +//nolint:revive +func GetVisibilityPersistenceMaxReadQPS( + dc *dynamicconfig.Collection, + advancedVisibilityStoreConfigExists bool, +) dynamicconfig.IntPropertyFn { + if dc.HasKey(dynamicconfig.VisibilityPersistenceMaxReadQPS) { + return dc.GetIntProperty(dynamicconfig.VisibilityPersistenceMaxReadQPS, 9000) + } + if advancedVisibilityStoreConfigExists { + return dc.GetIntProperty(dynamicconfig.AdvancedVisibilityPersistenceMaxReadQPS, 9000) + } + return dc.GetIntProperty(dynamicconfig.StandardVisibilityPersistenceMaxReadQPS, 9000) +} + +//nolint:revive +func GetVisibilityPersistenceMaxWriteQPS( + dc *dynamicconfig.Collection, + advancedVisibilityStoreConfigExists bool, +) dynamicconfig.IntPropertyFn { + if dc.HasKey(dynamicconfig.VisibilityPersistenceMaxWriteQPS) { + return dc.GetIntProperty(dynamicconfig.VisibilityPersistenceMaxWriteQPS, 9000) + } + if advancedVisibilityStoreConfigExists { + return dc.GetIntProperty(dynamicconfig.AdvancedVisibilityPersistenceMaxWriteQPS, 9000) + } + return dc.GetIntProperty(dynamicconfig.StandardVisibilityPersistenceMaxWriteQPS, 9000) +} + +//nolint:revive +func GetEnableReadFromSecondaryVisibilityConfig( + dc *dynamicconfig.Collection, + visibilityStoreConfigExists bool, + advancedVisibilityStoreConfigExists bool, +) dynamicconfig.BoolPropertyFnWithNamespaceFilter { + if dc.HasKey(dynamicconfig.EnableReadFromSecondaryVisibility) { + return dc.GetBoolPropertyFnWithNamespaceFilter( + dynamicconfig.EnableReadFromSecondaryVisibility, + false, + ) + } + if visibilityStoreConfigExists && advancedVisibilityStoreConfigExists { + return dc.GetBoolPropertyFnWithNamespaceFilter( + dynamicconfig.EnableReadVisibilityFromES, + advancedVisibilityStoreConfigExists, + ) + } + if advancedVisibilityStoreConfigExists { + return dc.GetBoolPropertyFnWithNamespaceFilter( + dynamicconfig.EnableReadFromSecondaryAdvancedVisibility, + false, + ) + } + return dynamicconfig.GetBoolPropertyFnFilteredByNamespace(false) +} + +//nolint:revive +func GetSecondaryVisibilityWritingModeConfig( + dc *dynamicconfig.Collection, + visibilityStoreConfigExists bool, + advancedVisibilityStoreConfigExists bool, +) dynamicconfig.StringPropertyFn { + if dc.HasKey(dynamicconfig.SecondaryVisibilityWritingMode) { + return dc.GetStringProperty( + dynamicconfig.SecondaryVisibilityWritingMode, + SecondaryVisibilityWritingModeOff, + ) + } + if visibilityStoreConfigExists && advancedVisibilityStoreConfigExists { + return dc.GetStringProperty( + dynamicconfig.AdvancedVisibilityWritingMode, + DefaultAdvancedVisibilityWritingMode(advancedVisibilityStoreConfigExists), + ) + } + if advancedVisibilityStoreConfigExists { + enableWriteToSecondaryAdvancedVisibility := dc.GetBoolProperty( + dynamicconfig.EnableWriteToSecondaryAdvancedVisibility, + false, + ) + if enableWriteToSecondaryAdvancedVisibility() { + return dynamicconfig.GetStringPropertyFn(SecondaryVisibilityWritingModeDual) + } + } + return dynamicconfig.GetStringPropertyFn(SecondaryVisibilityWritingModeOff) +} + func AllowListForValidation(storeNames []string) bool { if len(storeNames) == 0 { return false diff --git a/common/persistence/visibility/factory.go b/common/persistence/visibility/factory.go index e2daee35e06..71d778bc946 100644 --- a/common/persistence/visibility/factory.go +++ b/common/persistence/visibility/factory.go @@ -48,48 +48,29 @@ func NewManager( persistenceCfg config.Persistence, persistenceResolver resolver.ServiceResolver, - defaultIndexName string, - secondaryVisibilityIndexName string, esClient esclient.Client, esProcessorConfig *elasticsearch.ProcessorConfig, searchAttributesProvider searchattribute.Provider, searchAttributesMapperProvider searchattribute.MapperProvider, - standardVisibilityPersistenceMaxReadQPS dynamicconfig.IntPropertyFn, - standardVisibilityPersistenceMaxWriteQPS dynamicconfig.IntPropertyFn, - advancedVisibilityPersistenceMaxReadQPS dynamicconfig.IntPropertyFn, - advancedVisibilityPersistenceMaxWriteQPS dynamicconfig.IntPropertyFn, - enableAdvancedVisibilityRead dynamicconfig.BoolPropertyFnWithNamespaceFilter, - advancedVisibilityWritingMode dynamicconfig.StringPropertyFn, - enableReadFromSecondaryAdvancedVisibility dynamicconfig.BoolPropertyFnWithNamespaceFilter, - enableWriteToSecondaryAdvancedVisibility dynamicconfig.BoolPropertyFn, + maxReadQPS dynamicconfig.IntPropertyFn, + maxWriteQPS dynamicconfig.IntPropertyFn, + enableReadFromSecondaryVisibility dynamicconfig.BoolPropertyFnWithNamespaceFilter, + secondaryVisibilityWritingMode dynamicconfig.StringPropertyFn, visibilityDisableOrderByClause dynamicconfig.BoolPropertyFnWithNamespaceFilter, metricsHandler metrics.Handler, logger log.Logger, ) (manager.VisibilityManager, error) { - stdVisibilityManager, err := NewStandardManager( - persistenceCfg, + visibilityManager, err := newVisibilityManagerFromDataStoreConfig( + persistenceCfg.GetVisibilityStoreConfig(), persistenceResolver, - searchAttributesProvider, - searchAttributesMapperProvider, - standardVisibilityPersistenceMaxReadQPS, - standardVisibilityPersistenceMaxWriteQPS, - metricsHandler, - logger, - ) - if err != nil { - return nil, err - } - - advVisibilityManager, err := NewAdvancedManager( - defaultIndexName, esClient, esProcessorConfig, searchAttributesProvider, searchAttributesMapperProvider, - advancedVisibilityPersistenceMaxReadQPS, - advancedVisibilityPersistenceMaxWriteQPS, + maxReadQPS, + maxWriteQPS, visibilityDisableOrderByClause, metricsHandler, logger, @@ -97,15 +78,20 @@ func NewManager( if err != nil { return nil, err } + if visibilityManager == nil { + logger.Fatal("invalid config: visibility store must be configured") + return nil, nil + } - secondaryVisibilityManager, err := NewAdvancedManager( - secondaryVisibilityIndexName, + secondaryVisibilityManager, err := newVisibilityManagerFromDataStoreConfig( + persistenceCfg.GetSecondaryVisibilityStoreConfig(), + persistenceResolver, esClient, esProcessorConfig, searchAttributesProvider, searchAttributesMapperProvider, - advancedVisibilityPersistenceMaxReadQPS, - advancedVisibilityPersistenceMaxWriteQPS, + maxReadQPS, + maxWriteQPS, visibilityDisableOrderByClause, metricsHandler, logger, @@ -114,207 +100,190 @@ func NewManager( return nil, err } - if stdVisibilityManager == nil && advVisibilityManager == nil { - logger.Fatal("invalid config: one of standard or advanced visibility must be configured") - return nil, nil - } - - if stdVisibilityManager != nil && secondaryVisibilityManager != nil { - logger.Fatal("invalid config: secondary visibility store cannot be used with standard visibility") - return nil, nil - } - - if stdVisibilityManager != nil && advVisibilityManager == nil { - return stdVisibilityManager, nil - } - - if stdVisibilityManager == nil && advVisibilityManager != nil { - if secondaryVisibilityManager == nil { - return advVisibilityManager, nil + if secondaryVisibilityManager != nil { + isPrimaryAdvancedSQL := false + isSecondaryAdvancedSQL := false + switch visibilityManager.GetStoreNames()[0] { + case mysql.PluginNameV8, postgresql.PluginNameV12, sqlite.PluginName: + isPrimaryAdvancedSQL = true + } + switch secondaryVisibilityManager.GetStoreNames()[0] { + case mysql.PluginNameV8, postgresql.PluginNameV12, sqlite.PluginName: + isSecondaryAdvancedSQL = true + } + if isPrimaryAdvancedSQL && !isSecondaryAdvancedSQL { + logger.Fatal("invalid config: dual visibility combination not supported") + return nil, nil } - // Dual write to primary and secondary ES indices. - managerSelector := NewESManagerSelector( - advVisibilityManager, + managerSelector := newDefaultManagerSelector( + visibilityManager, secondaryVisibilityManager, - enableReadFromSecondaryAdvancedVisibility, - enableWriteToSecondaryAdvancedVisibility) - + enableReadFromSecondaryVisibility, + secondaryVisibilityWritingMode, + ) return NewVisibilityManagerDual( - advVisibilityManager, + visibilityManager, secondaryVisibilityManager, managerSelector, ), nil } - // Dual write to standard and advanced visibility. - managerSelector := NewSQLToESManagerSelector( - stdVisibilityManager, - advVisibilityManager, - enableAdvancedVisibilityRead, - advancedVisibilityWritingMode) - return NewVisibilityManagerDual( - stdVisibilityManager, - advVisibilityManager, - managerSelector, - ), nil + return visibilityManager, nil } -func NewStandardManager( - persistenceCfg config.Persistence, - persistenceResolver resolver.ServiceResolver, - searchAttributesProvider searchattribute.Provider, - searchAttributesMapperProvider searchattribute.MapperProvider, - - standardVisibilityPersistenceMaxReadQPS dynamicconfig.IntPropertyFn, - standardVisibilityPersistenceMaxWriteQPS dynamicconfig.IntPropertyFn, - +func newVisibilityManager( + visStore store.VisibilityStore, + maxReadQPS dynamicconfig.IntPropertyFn, + maxWriteQPS dynamicconfig.IntPropertyFn, metricsHandler metrics.Handler, + tag metrics.Tag, logger log.Logger, -) (manager.VisibilityManager, error) { - - stdVisibilityStore, err := newStandardVisibilityStore( - persistenceCfg, - persistenceResolver, - searchAttributesProvider, - searchAttributesMapperProvider, - logger) - if err != nil { - return nil, err +) manager.VisibilityManager { + if visStore == nil { + return nil } + var visManager manager.VisibilityManager = newVisibilityManagerImpl(visStore, logger) - return newVisibilityManager( - stdVisibilityStore, - standardVisibilityPersistenceMaxReadQPS, - standardVisibilityPersistenceMaxWriteQPS, + // wrap with rate limiter + visManager = NewVisibilityManagerRateLimited( + visManager, + maxReadQPS, + maxWriteQPS) + // wrap with metrics client + visManager = NewVisibilityManagerMetrics( + visManager, metricsHandler, - metrics.StandardVisibilityTypeTag(), - logger), nil + logger, + tag) + + return visManager } -func NewAdvancedManager( - defaultIndexName string, +//nolint:revive // too many arguments +func newVisibilityManagerFromDataStoreConfig( + dsConfig config.DataStore, + persistenceResolver resolver.ServiceResolver, + esClient esclient.Client, esProcessorConfig *elasticsearch.ProcessorConfig, searchAttributesProvider searchattribute.Provider, searchAttributesMapperProvider searchattribute.MapperProvider, - advancedVisibilityPersistenceMaxReadQPS dynamicconfig.IntPropertyFn, - advancedVisibilityPersistenceMaxWriteQPS dynamicconfig.IntPropertyFn, + maxReadQPS dynamicconfig.IntPropertyFn, + maxWriteQPS dynamicconfig.IntPropertyFn, visibilityDisableOrderByClause dynamicconfig.BoolPropertyFnWithNamespaceFilter, metricsHandler metrics.Handler, logger log.Logger, ) (manager.VisibilityManager, error) { - if defaultIndexName == "" { - return nil, nil - } - - advVisibilityStore := newAdvancedVisibilityStore( - defaultIndexName, + visStore, err := newVisibilityStoreFromDataStoreConfig( + dsConfig, + persistenceResolver, esClient, esProcessorConfig, searchAttributesProvider, searchAttributesMapperProvider, visibilityDisableOrderByClause, metricsHandler, - logger) - - return newVisibilityManager( - advVisibilityStore, - advancedVisibilityPersistenceMaxReadQPS, - advancedVisibilityPersistenceMaxWriteQPS, - metricsHandler, - metrics.AdvancedVisibilityTypeTag(), logger, - ), nil -} - -func newVisibilityManager( - store store.VisibilityStore, - maxReadQPS dynamicconfig.IntPropertyFn, - maxWriteQPS dynamicconfig.IntPropertyFn, - metricsHandler metrics.Handler, - tag metrics.Tag, - logger log.Logger, -) manager.VisibilityManager { - if store == nil { - return nil + ) + if err != nil { + return nil, err } - - var manager manager.VisibilityManager = newVisibilityManagerImpl(store, logger) - - // wrap with rate limiter - manager = NewVisibilityManagerRateLimited( - manager, + if visStore == nil { + return nil, nil + } + return newVisibilityManager( + visStore, maxReadQPS, - maxWriteQPS) - // wrap with metrics client - manager = NewVisibilityManagerMetrics( - manager, + maxWriteQPS, metricsHandler, + metrics.AdvancedVisibilityTypeTag(), logger, - tag) - - return manager + ), nil } -func newStandardVisibilityStore( - persistenceCfg config.Persistence, +func newVisibilityStoreFromDataStoreConfig( + dsConfig config.DataStore, persistenceResolver resolver.ServiceResolver, + + esClient esclient.Client, + esProcessorConfig *elasticsearch.ProcessorConfig, searchAttributesProvider searchattribute.Provider, searchAttributesMapperProvider searchattribute.MapperProvider, + visibilityDisableOrderByClause dynamicconfig.BoolPropertyFnWithNamespaceFilter, + + metricsHandler metrics.Handler, logger log.Logger, ) (store.VisibilityStore, error) { - // If standard visibility is not configured. - if persistenceCfg.VisibilityStore == "" { - return nil, nil - } - - visibilityStoreCfg := persistenceCfg.DataStores[persistenceCfg.VisibilityStore] - var ( - visStore store.VisibilityStore - isStandard bool - err error + visStore store.VisibilityStore + err error ) - switch { - case visibilityStoreCfg.Cassandra != nil: - visStore, err = cassandra.NewVisibilityStore(*visibilityStoreCfg.Cassandra, persistenceResolver, logger) - isStandard = true - case visibilityStoreCfg.SQL != nil: - switch visibilityStoreCfg.SQL.PluginName { + if dsConfig.SQL != nil { + switch dsConfig.SQL.PluginName { case mysql.PluginNameV8, postgresql.PluginNameV12, sqlite.PluginName: - isStandard = false visStore, err = sql.NewSQLVisibilityStore( - *visibilityStoreCfg.SQL, + *dsConfig.SQL, persistenceResolver, searchAttributesProvider, searchAttributesMapperProvider, logger, ) default: - isStandard = true - visStore, err = standardSql.NewSQLVisibilityStore(*visibilityStoreCfg.SQL, persistenceResolver, logger) + visStore, err = newStandardVisibilityStore(dsConfig, persistenceResolver, logger) } + } else if dsConfig.Cassandra != nil { + visStore, err = newStandardVisibilityStore(dsConfig, persistenceResolver, logger) + } else if dsConfig.Elasticsearch != nil { + visStore = newElasticsearchVisibilityStore( + dsConfig.Elasticsearch.GetVisibilityIndex(), + esClient, + esProcessorConfig, + searchAttributesProvider, + searchAttributesMapperProvider, + visibilityDisableOrderByClause, + metricsHandler, + logger, + ) } + return visStore, err +} +func newStandardVisibilityStore( + dsConfig config.DataStore, + persistenceResolver resolver.ServiceResolver, + logger log.Logger, +) (store.VisibilityStore, error) { + var ( + visStore store.VisibilityStore + err error + ) + if dsConfig.Cassandra != nil { + visStore, err = cassandra.NewVisibilityStore( + *dsConfig.Cassandra, + persistenceResolver, + logger, + ) + } else if dsConfig.SQL != nil { + visStore, err = standardSql.NewSQLVisibilityStore( + *dsConfig.SQL, + persistenceResolver, + logger, + ) + } if err != nil { return nil, err } - if visStore == nil { logger.Fatal("invalid config: one of cassandra or sql params must be specified for visibility store") return nil, nil } - - if isStandard { - return standard.NewVisibilityStore(visStore), nil - } - return visStore, nil + return standard.NewVisibilityStore(visStore), nil } -func newAdvancedVisibilityStore( +func newElasticsearchVisibilityStore( defaultIndexName string, esClient esclient.Client, esProcessorConfig *elasticsearch.ProcessorConfig, diff --git a/common/persistence/visibility/manager_selector.go b/common/persistence/visibility/manager_selector.go index c917e3d7aaf..436217fdd0e 100644 --- a/common/persistence/visibility/manager_selector.go +++ b/common/persistence/visibility/manager_selector.go @@ -36,87 +36,52 @@ import ( type ( managerSelector interface { - readManager(namespace namespace.Name) manager.VisibilityManager + readManager(nsName namespace.Name) manager.VisibilityManager writeManagers() ([]manager.VisibilityManager, error) } - sqlToESManagerSelector struct { - enableAdvancedVisibilityRead dynamicconfig.BoolPropertyFnWithNamespaceFilter - advancedVisibilityWritingMode dynamicconfig.StringPropertyFn - stdVisibilityManager manager.VisibilityManager - advVisibilityManager manager.VisibilityManager - } - - esManagerSelector struct { - enableReadFromSecondaryVisibility dynamicconfig.BoolPropertyFnWithNamespaceFilter - enableWriteToSecondaryVisibility dynamicconfig.BoolPropertyFn + defaultManagerSelector struct { visibilityManager manager.VisibilityManager secondaryVisibilityManager manager.VisibilityManager + enableReadFromSecondaryVisibility dynamicconfig.BoolPropertyFnWithNamespaceFilter + secondaryVisibilityWritingMode dynamicconfig.StringPropertyFn } ) -var _ managerSelector = (*sqlToESManagerSelector)(nil) -var _ managerSelector = (*esManagerSelector)(nil) - -func NewSQLToESManagerSelector( - stdVisibilityManager manager.VisibilityManager, - advVisibilityManager manager.VisibilityManager, - enableAdvancedVisibilityRead dynamicconfig.BoolPropertyFnWithNamespaceFilter, - advancedVisibilityWritingMode dynamicconfig.StringPropertyFn, -) *sqlToESManagerSelector { - return &sqlToESManagerSelector{ - stdVisibilityManager: stdVisibilityManager, - advVisibilityManager: advVisibilityManager, - enableAdvancedVisibilityRead: enableAdvancedVisibilityRead, - advancedVisibilityWritingMode: advancedVisibilityWritingMode, - } -} +var _ managerSelector = (*defaultManagerSelector)(nil) -func NewESManagerSelector( +func newDefaultManagerSelector( visibilityManager manager.VisibilityManager, secondaryVisibilityManager manager.VisibilityManager, - enableReadFromSecondaryVisibility dynamicconfig.BoolPropertyFnWithNamespaceFilter, - enableWriteToSecondaryVisibility dynamicconfig.BoolPropertyFn, -) *esManagerSelector { - return &esManagerSelector{ + enableSecondaryVisibilityRead dynamicconfig.BoolPropertyFnWithNamespaceFilter, + secondaryVisibilityWritingMode dynamicconfig.StringPropertyFn, +) *defaultManagerSelector { + return &defaultManagerSelector{ visibilityManager: visibilityManager, secondaryVisibilityManager: secondaryVisibilityManager, - enableReadFromSecondaryVisibility: enableReadFromSecondaryVisibility, - enableWriteToSecondaryVisibility: enableWriteToSecondaryVisibility, + enableReadFromSecondaryVisibility: enableSecondaryVisibilityRead, + secondaryVisibilityWritingMode: secondaryVisibilityWritingMode, } } -func (v *sqlToESManagerSelector) writeManagers() ([]manager.VisibilityManager, error) { - switch v.advancedVisibilityWritingMode() { +func (v *defaultManagerSelector) writeManagers() ([]manager.VisibilityManager, error) { + switch v.secondaryVisibilityWritingMode() { case SecondaryVisibilityWritingModeOff: - return []manager.VisibilityManager{v.stdVisibilityManager}, nil + return []manager.VisibilityManager{v.visibilityManager}, nil case SecondaryVisibilityWritingModeOn: - return []manager.VisibilityManager{v.advVisibilityManager}, nil + return []manager.VisibilityManager{v.secondaryVisibilityManager}, nil case SecondaryVisibilityWritingModeDual: - return []manager.VisibilityManager{v.stdVisibilityManager, v.advVisibilityManager}, nil + return []manager.VisibilityManager{v.visibilityManager, v.secondaryVisibilityManager}, nil default: - return nil, serviceerror.NewInternal(fmt.Sprintf("Unknown advanced visibility writing mode: %s", v.advancedVisibilityWritingMode())) + return nil, serviceerror.NewInternal(fmt.Sprintf( + "Unknown secondary visibility writing mode: %s", + v.secondaryVisibilityWritingMode(), + )) } } -func (v *sqlToESManagerSelector) readManager(namespace namespace.Name) manager.VisibilityManager { - if v.enableAdvancedVisibilityRead(namespace.String()) { - return v.advVisibilityManager - } - return v.stdVisibilityManager -} - -func (v *esManagerSelector) writeManagers() ([]manager.VisibilityManager, error) { - managers := []manager.VisibilityManager{v.visibilityManager} - if v.enableWriteToSecondaryVisibility() { - managers = append(managers, v.secondaryVisibilityManager) - } - - return managers, nil -} - -func (v *esManagerSelector) readManager(namespace namespace.Name) manager.VisibilityManager { - if v.enableReadFromSecondaryVisibility(namespace.String()) { +func (v *defaultManagerSelector) readManager(nsName namespace.Name) manager.VisibilityManager { + if v.enableReadFromSecondaryVisibility(nsName.String()) { return v.secondaryVisibilityManager } return v.visibilityManager diff --git a/common/persistence/visibility/store/elasticsearch/client/config.go b/common/persistence/visibility/store/elasticsearch/client/config.go index 07bc6836801..5880880726b 100644 --- a/common/persistence/visibility/store/elasticsearch/client/config.go +++ b/common/persistence/visibility/store/elasticsearch/client/config.go @@ -25,6 +25,7 @@ package client import ( + "errors" "fmt" "net/url" "time" @@ -94,17 +95,15 @@ func (cfg *Config) GetSecondaryVisibilityIndex() string { return cfg.Indices[SecondaryVisibilityAppName] } -func (cfg *Config) Validate(storeName string) error { +func (cfg *Config) Validate() error { if cfg == nil { - return fmt.Errorf("persistence config: advanced visibility datastore %q: must provide config for \"elasticsearch\"", storeName) + return errors.New("elasticsearch config: config not found") } - if len(cfg.Indices) < 1 { - return fmt.Errorf("persistence config: advanced visibility datastore %q: missing indices", storeName) - + return errors.New("elasticsearch config: missing indices") } if cfg.Indices[VisibilityAppName] == "" { - return fmt.Errorf("persistence config: advanced visibility datastore %q indices configuration: missing %q key", storeName, VisibilityAppName) + return fmt.Errorf("elasticsearch config: indices configuration: missing %q key", VisibilityAppName) } return nil } diff --git a/service/frontend/dcRedirectionPolicy_test.go b/service/frontend/dcRedirectionPolicy_test.go index 5a91f20ae45..d39ae8cef2e 100644 --- a/service/frontend/dcRedirectionPolicy_test.go +++ b/service/frontend/dcRedirectionPolicy_test.go @@ -138,7 +138,7 @@ func (s *selectedAPIsForwardingRedirectionPolicySuite) SetupTest() { logger := log.NewTestLogger() - s.mockConfig = NewConfig(dynamicconfig.NewCollection(dynamicconfig.NewNoopClient(), logger), 0, false) + s.mockConfig = NewConfig(dynamicconfig.NewCollection(dynamicconfig.NewNoopClient(), logger), 0, true, false) s.mockClusterMetadata.EXPECT().IsGlobalNamespaceEnabled().Return(true).AnyTimes() s.policy = NewSelectedAPIsForwardingPolicy( s.currentClusterName, diff --git a/service/frontend/fx.go b/service/frontend/fx.go index b1330462f26..df4b1a50b44 100644 --- a/service/frontend/fx.go +++ b/service/frontend/fx.go @@ -220,6 +220,7 @@ func ConfigProvider( return NewConfig( dc, persistenceConfig.NumHistoryShards, + persistenceConfig.StandardVisibilityConfigExist(), persistenceConfig.AdvancedVisibilityConfigExist(), ) } @@ -390,7 +391,6 @@ func VisibilityManagerProvider( persistenceConfig *config.Persistence, metricsHandler metrics.Handler, serviceConfig *Config, - esConfig *esclient.Config, esClient esclient.Client, persistenceServiceResolver resolver.ServiceResolver, searchAttributesMapperProvider searchattribute.MapperProvider, @@ -399,20 +399,14 @@ func VisibilityManagerProvider( return visibility.NewManager( *persistenceConfig, persistenceServiceResolver, - esConfig.GetVisibilityIndex(), - esConfig.GetSecondaryVisibilityIndex(), esClient, nil, // frontend visibility never write saProvider, searchAttributesMapperProvider, - serviceConfig.StandardVisibilityPersistenceMaxReadQPS, - serviceConfig.StandardVisibilityPersistenceMaxWriteQPS, - serviceConfig.AdvancedVisibilityPersistenceMaxReadQPS, - serviceConfig.AdvancedVisibilityPersistenceMaxWriteQPS, - serviceConfig.EnableReadVisibilityFromES, + serviceConfig.VisibilityPersistenceMaxReadQPS, + serviceConfig.VisibilityPersistenceMaxWriteQPS, + serviceConfig.EnableReadFromSecondaryVisibility, dynamicconfig.GetStringPropertyFn(visibility.SecondaryVisibilityWritingModeOff), // frontend visibility never write - serviceConfig.EnableReadFromSecondaryAdvancedVisibility, - dynamicconfig.GetBoolPropertyFn(false), // frontend visibility never write serviceConfig.VisibilityDisableOrderByClause, metricsHandler, logger, diff --git a/service/frontend/redirection_interceptor_test.go b/service/frontend/redirection_interceptor_test.go index 1b3e611337f..af2932938f3 100644 --- a/service/frontend/redirection_interceptor_test.go +++ b/service/frontend/redirection_interceptor_test.go @@ -87,6 +87,7 @@ func (s *redirectionInterceptorSuite) SetupTest() { NewConfig( dynamicconfig.NewNoopCollection(), 1, + true, false, ), s.namespaceCache, diff --git a/service/frontend/service.go b/service/frontend/service.go index af2c79dd594..4ee93bfddf5 100644 --- a/service/frontend/service.go +++ b/service/frontend/service.go @@ -47,6 +47,7 @@ import ( "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence/client" + "go.temporal.io/server/common/persistence/visibility" "go.temporal.io/server/common/persistence/visibility/manager" "go.temporal.io/server/common/util" ) @@ -59,14 +60,11 @@ type Config struct { PersistenceNamespaceMaxQPS dynamicconfig.IntPropertyFnWithNamespaceFilter EnablePersistencePriorityRateLimiting dynamicconfig.BoolPropertyFn - StandardVisibilityPersistenceMaxReadQPS dynamicconfig.IntPropertyFn - StandardVisibilityPersistenceMaxWriteQPS dynamicconfig.IntPropertyFn - AdvancedVisibilityPersistenceMaxReadQPS dynamicconfig.IntPropertyFn - AdvancedVisibilityPersistenceMaxWriteQPS dynamicconfig.IntPropertyFn - VisibilityMaxPageSize dynamicconfig.IntPropertyFnWithNamespaceFilter - EnableReadVisibilityFromES dynamicconfig.BoolPropertyFnWithNamespaceFilter - EnableReadFromSecondaryAdvancedVisibility dynamicconfig.BoolPropertyFnWithNamespaceFilter - VisibilityDisableOrderByClause dynamicconfig.BoolPropertyFnWithNamespaceFilter + VisibilityPersistenceMaxReadQPS dynamicconfig.IntPropertyFn + VisibilityPersistenceMaxWriteQPS dynamicconfig.IntPropertyFn + VisibilityMaxPageSize dynamicconfig.IntPropertyFnWithNamespaceFilter + EnableReadFromSecondaryVisibility dynamicconfig.BoolPropertyFnWithNamespaceFilter + VisibilityDisableOrderByClause dynamicconfig.BoolPropertyFnWithNamespaceFilter HistoryMaxPageSize dynamicconfig.IntPropertyFnWithNamespaceFilter RPS dynamicconfig.IntPropertyFn @@ -169,7 +167,12 @@ type Config struct { } // NewConfig returns new service config with default values -func NewConfig(dc *dynamicconfig.Collection, numHistoryShards int32, enableReadFromES bool) *Config { +func NewConfig( + dc *dynamicconfig.Collection, + numHistoryShards int32, + visibilityStoreConfigExist bool, + enableReadFromES bool, +) *Config { return &Config{ NumHistoryShards: numHistoryShards, PersistenceMaxQPS: dc.GetIntProperty(dynamicconfig.FrontendPersistenceMaxQPS, 2000), @@ -177,14 +180,11 @@ func NewConfig(dc *dynamicconfig.Collection, numHistoryShards int32, enableReadF PersistenceNamespaceMaxQPS: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendPersistenceNamespaceMaxQPS, 0), EnablePersistencePriorityRateLimiting: dc.GetBoolProperty(dynamicconfig.FrontendEnablePersistencePriorityRateLimiting, true), - StandardVisibilityPersistenceMaxReadQPS: dc.GetIntProperty(dynamicconfig.StandardVisibilityPersistenceMaxReadQPS, 9000), - StandardVisibilityPersistenceMaxWriteQPS: dc.GetIntProperty(dynamicconfig.StandardVisibilityPersistenceMaxWriteQPS, 9000), - AdvancedVisibilityPersistenceMaxReadQPS: dc.GetIntProperty(dynamicconfig.AdvancedVisibilityPersistenceMaxReadQPS, 9000), - AdvancedVisibilityPersistenceMaxWriteQPS: dc.GetIntProperty(dynamicconfig.AdvancedVisibilityPersistenceMaxWriteQPS, 9000), - VisibilityMaxPageSize: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendVisibilityMaxPageSize, 1000), - EnableReadVisibilityFromES: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.EnableReadVisibilityFromES, enableReadFromES), - EnableReadFromSecondaryAdvancedVisibility: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.EnableReadFromSecondaryAdvancedVisibility, false), - VisibilityDisableOrderByClause: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.VisibilityDisableOrderByClause, false), + VisibilityPersistenceMaxReadQPS: visibility.GetVisibilityPersistenceMaxReadQPS(dc, enableReadFromES), + VisibilityPersistenceMaxWriteQPS: visibility.GetVisibilityPersistenceMaxWriteQPS(dc, enableReadFromES), + VisibilityMaxPageSize: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendVisibilityMaxPageSize, 1000), + EnableReadFromSecondaryVisibility: visibility.GetEnableReadFromSecondaryVisibilityConfig(dc, visibilityStoreConfigExist, enableReadFromES), + VisibilityDisableOrderByClause: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.VisibilityDisableOrderByClause, false), HistoryMaxPageSize: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendHistoryMaxPageSize, common.GetHistoryMaxPageSize), RPS: dc.GetIntProperty(dynamicconfig.FrontendRPS, 2400), diff --git a/service/frontend/workflow_handler_test.go b/service/frontend/workflow_handler_test.go index a4092a380a9..8a2aadece1c 100644 --- a/service/frontend/workflow_handler_test.go +++ b/service/frontend/workflow_handler_test.go @@ -201,6 +201,7 @@ func (s *workflowHandlerSuite) TestDisableListVisibilityByFilter() { wh := s.getWorkflowHandler(config) s.mockNamespaceCache.EXPECT().GetNamespaceID(gomock.Any()).Return(namespaceID, nil).AnyTimes() + s.mockVisibilityMgr.EXPECT().GetReadStoreName(testNamespace).Return("").AnyTimes() // test list open by wid listRequest := &workflowservice.ListOpenWorkflowExecutionsRequest{ @@ -1781,9 +1782,7 @@ func (s *workflowHandlerSuite) TestGetSearchAttributes() { } func (s *workflowHandlerSuite) TestDescribeWorkflowExecution_RunningStatus() { - config := s.newConfig() - config.EnableReadVisibilityFromES = dc.GetBoolPropertyFnFilteredByNamespace(true) - wh := s.getWorkflowHandler(config) + wh := s.getWorkflowHandler(s.newConfig()) now := timestamp.TimePtr(time.Now()) s.mockNamespaceCache.EXPECT().GetNamespaceID(gomock.Any()).Return( @@ -1819,9 +1818,7 @@ func (s *workflowHandlerSuite) TestDescribeWorkflowExecution_RunningStatus() { } func (s *workflowHandlerSuite) TestDescribeWorkflowExecution_CompletedStatus() { - config := s.newConfig() - config.EnableReadVisibilityFromES = dc.GetBoolPropertyFnFilteredByNamespace(true) - wh := s.getWorkflowHandler(config) + wh := s.getWorkflowHandler(s.newConfig()) now := timestamp.TimePtr(time.Now()) s.mockNamespaceCache.EXPECT().GetNamespaceID(gomock.Any()).Return( @@ -1858,10 +1855,9 @@ func (s *workflowHandlerSuite) TestDescribeWorkflowExecution_CompletedStatus() { func (s *workflowHandlerSuite) TestListWorkflowExecutions() { config := s.newConfig() - config.EnableReadVisibilityFromES = dc.GetBoolPropertyFnFilteredByNamespace(true) - wh := s.getWorkflowHandler(config) s.mockNamespaceCache.EXPECT().GetNamespaceID(s.testNamespace).Return(s.testNamespaceID, nil).AnyTimes() + s.mockVisibilityMgr.EXPECT().GetReadStoreName(s.testNamespace).Return(elasticsearch.PersistenceName).AnyTimes() query := "WorkflowId = 'wid'" listRequest := &workflowservice.ListWorkflowExecutionsRequest{ @@ -1921,9 +1917,9 @@ func (s *workflowHandlerSuite) TestListWorkflowExecutions() { func (s *workflowHandlerSuite) TestScanWorkflowExecutions() { config := s.newConfig() - config.EnableReadVisibilityFromES = dc.GetBoolPropertyFnFilteredByNamespace(true) wh := s.getWorkflowHandler(config) s.mockNamespaceCache.EXPECT().GetNamespaceID(s.testNamespace).Return(s.testNamespaceID, nil).AnyTimes() + s.mockVisibilityMgr.EXPECT().GetReadStoreName(s.testNamespace).Return(elasticsearch.PersistenceName).AnyTimes() query := "WorkflowId = 'wid'" scanRequest := &workflowservice.ScanWorkflowExecutionsRequest{ @@ -1982,9 +1978,7 @@ func (s *workflowHandlerSuite) TestScanWorkflowExecutions() { } func (s *workflowHandlerSuite) TestCountWorkflowExecutions() { - config := s.newConfig() - config.EnableReadVisibilityFromES = dc.GetBoolPropertyFnFilteredByNamespace(true) - wh := s.getWorkflowHandler(config) + wh := s.getWorkflowHandler(s.newConfig()) s.mockNamespaceCache.EXPECT().GetNamespaceID(gomock.Any()).Return(s.testNamespaceID, nil).AnyTimes() s.mockVisibilityMgr.EXPECT().CountWorkflowExecutions(gomock.Any(), gomock.Any()).Return(&manager.CountWorkflowExecutionsResponse{Count: 5}, nil) @@ -2057,10 +2051,7 @@ func (s *workflowHandlerSuite) TestVerifyHistoryIsComplete() { } func (s *workflowHandlerSuite) TestGetSystemInfo() { - config := s.newConfig() - config.EnableReadVisibilityFromES = dc.GetBoolPropertyFnFilteredByNamespace(true) - - wh := s.getWorkflowHandler(config) + wh := s.getWorkflowHandler(s.newConfig()) resp, err := wh.GetSystemInfo(context.Background(), &workflowservice.GetSystemInfoRequest{}) s.NoError(err) @@ -2626,6 +2617,7 @@ func (s *workflowHandlerSuite) TestListBatchOperations() { wh := s.getWorkflowHandler(config) now := timestamp.TimePtr(time.Now()) s.mockNamespaceCache.EXPECT().GetNamespaceID(gomock.Any()).Return(namespaceID, nil).AnyTimes() + s.mockVisibilityMgr.EXPECT().GetReadStoreName(testNamespace).Return("").AnyTimes() s.mockVisibilityMgr.EXPECT().ListWorkflowExecutions(gomock.Any(), gomock.Any()).DoAndReturn( func( _ context.Context, @@ -2677,7 +2669,7 @@ func (s *workflowHandlerSuite) TestListBatchOperations_InvalidRerquest() { } func (s *workflowHandlerSuite) newConfig() *Config { - return NewConfig(dc.NewCollection(dc.NewNoopClient(), s.mockResource.GetLogger()), numHistoryShards, false) + return NewConfig(dc.NewCollection(dc.NewNoopClient(), s.mockResource.GetLogger()), numHistoryShards, true, false) } func updateRequest( diff --git a/service/history/configs/config.go b/service/history/configs/config.go index b320424648b..5a5d8274a23 100644 --- a/service/history/configs/config.go +++ b/service/history/configs/config.go @@ -48,15 +48,11 @@ type Config struct { PersistenceNamespaceMaxQPS dynamicconfig.IntPropertyFnWithNamespaceFilter EnablePersistencePriorityRateLimiting dynamicconfig.BoolPropertyFn - StandardVisibilityPersistenceMaxReadQPS dynamicconfig.IntPropertyFn - StandardVisibilityPersistenceMaxWriteQPS dynamicconfig.IntPropertyFn - AdvancedVisibilityPersistenceMaxReadQPS dynamicconfig.IntPropertyFn - AdvancedVisibilityPersistenceMaxWriteQPS dynamicconfig.IntPropertyFn - AdvancedVisibilityWritingMode dynamicconfig.StringPropertyFn - EnableWriteToSecondaryAdvancedVisibility dynamicconfig.BoolPropertyFn - EnableReadVisibilityFromES dynamicconfig.BoolPropertyFnWithNamespaceFilter - EnableReadFromSecondaryAdvancedVisibility dynamicconfig.BoolPropertyFnWithNamespaceFilter - VisibilityDisableOrderByClause dynamicconfig.BoolPropertyFnWithNamespaceFilter + VisibilityPersistenceMaxReadQPS dynamicconfig.IntPropertyFn + VisibilityPersistenceMaxWriteQPS dynamicconfig.IntPropertyFn + EnableReadFromSecondaryVisibility dynamicconfig.BoolPropertyFnWithNamespaceFilter + SecondaryVisibilityWritingMode dynamicconfig.StringPropertyFn + VisibilityDisableOrderByClause dynamicconfig.BoolPropertyFnWithNamespaceFilter EmitShardLagLog dynamicconfig.BoolPropertyFn MaxAutoResetPoints dynamicconfig.IntPropertyFnWithNamespaceFilter @@ -305,7 +301,12 @@ const ( ) // NewConfig returns new service config with default values -func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVisibilityConfigExist bool) *Config { +func NewConfig( + dc *dynamicconfig.Collection, + numberOfShards int32, + visibilityStoreConfigExist bool, + advancedVisibilityStoreConfigExist bool, +) *Config { cfg := &Config{ NumberOfShards: numberOfShards, @@ -322,15 +323,11 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis DefaultWorkflowTaskTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.DefaultWorkflowTaskTimeout, common.DefaultWorkflowTaskTimeout), ContinueAsNewMinInterval: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.ContinueAsNewMinInterval, time.Second), - StandardVisibilityPersistenceMaxReadQPS: dc.GetIntProperty(dynamicconfig.StandardVisibilityPersistenceMaxReadQPS, 9000), - StandardVisibilityPersistenceMaxWriteQPS: dc.GetIntProperty(dynamicconfig.StandardVisibilityPersistenceMaxWriteQPS, 9000), - AdvancedVisibilityPersistenceMaxReadQPS: dc.GetIntProperty(dynamicconfig.AdvancedVisibilityPersistenceMaxReadQPS, 9000), - AdvancedVisibilityPersistenceMaxWriteQPS: dc.GetIntProperty(dynamicconfig.AdvancedVisibilityPersistenceMaxWriteQPS, 9000), - AdvancedVisibilityWritingMode: dc.GetStringProperty(dynamicconfig.AdvancedVisibilityWritingMode, visibility.DefaultAdvancedVisibilityWritingMode(isAdvancedVisibilityConfigExist)), - EnableWriteToSecondaryAdvancedVisibility: dc.GetBoolProperty(dynamicconfig.EnableWriteToSecondaryAdvancedVisibility, false), - EnableReadVisibilityFromES: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.EnableReadVisibilityFromES, isAdvancedVisibilityConfigExist), - EnableReadFromSecondaryAdvancedVisibility: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.EnableReadFromSecondaryAdvancedVisibility, false), - VisibilityDisableOrderByClause: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.VisibilityDisableOrderByClause, false), + VisibilityPersistenceMaxReadQPS: visibility.GetVisibilityPersistenceMaxReadQPS(dc, advancedVisibilityStoreConfigExist), + VisibilityPersistenceMaxWriteQPS: visibility.GetVisibilityPersistenceMaxWriteQPS(dc, advancedVisibilityStoreConfigExist), + EnableReadFromSecondaryVisibility: visibility.GetEnableReadFromSecondaryVisibilityConfig(dc, visibilityStoreConfigExist, advancedVisibilityStoreConfigExist), + SecondaryVisibilityWritingMode: visibility.GetSecondaryVisibilityWritingModeConfig(dc, visibilityStoreConfigExist, advancedVisibilityStoreConfigExist), + VisibilityDisableOrderByClause: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.VisibilityDisableOrderByClause, false), EmitShardLagLog: dc.GetBoolProperty(dynamicconfig.EmitShardLagLog, false), HistoryCacheInitialSize: dc.GetIntProperty(dynamicconfig.HistoryCacheInitialSize, 128), diff --git a/service/history/fx.go b/service/history/fx.go index 430386c22b3..57c884118ac 100644 --- a/service/history/fx.go +++ b/service/history/fx.go @@ -164,8 +164,10 @@ func ConfigProvider( persistenceConfig config.Persistence, esConfig *esclient.Config, ) *configs.Config { - return configs.NewConfig(dc, + return configs.NewConfig( + dc, persistenceConfig.NumHistoryShards, + persistenceConfig.StandardVisibilityConfigExist(), persistenceConfig.AdvancedVisibilityConfigExist(), ) } @@ -232,7 +234,6 @@ func VisibilityManagerProvider( persistenceConfig *config.Persistence, esProcessorConfig *elasticsearch.ProcessorConfig, serviceConfig *configs.Config, - esConfig *esclient.Config, esClient esclient.Client, persistenceServiceResolver resolver.ServiceResolver, searchAttributesMapperProvider searchattribute.MapperProvider, @@ -241,20 +242,14 @@ func VisibilityManagerProvider( return visibility.NewManager( *persistenceConfig, persistenceServiceResolver, - esConfig.GetVisibilityIndex(), - esConfig.GetSecondaryVisibilityIndex(), esClient, esProcessorConfig, saProvider, searchAttributesMapperProvider, - serviceConfig.StandardVisibilityPersistenceMaxReadQPS, - serviceConfig.StandardVisibilityPersistenceMaxWriteQPS, - serviceConfig.AdvancedVisibilityPersistenceMaxReadQPS, - serviceConfig.AdvancedVisibilityPersistenceMaxWriteQPS, - serviceConfig.EnableReadVisibilityFromES, - serviceConfig.AdvancedVisibilityWritingMode, - serviceConfig.EnableReadFromSecondaryAdvancedVisibility, - serviceConfig.EnableWriteToSecondaryAdvancedVisibility, + serviceConfig.VisibilityPersistenceMaxReadQPS, + serviceConfig.VisibilityPersistenceMaxWriteQPS, + serviceConfig.EnableReadFromSecondaryVisibility, + serviceConfig.SecondaryVisibilityWritingMode, serviceConfig.VisibilityDisableOrderByClause, metricsHandler, logger, diff --git a/service/history/queue_factory_base_test.go b/service/history/queue_factory_base_test.go index 85be79fae83..c94723370e4 100644 --- a/service/history/queue_factory_base_test.go +++ b/service/history/queue_factory_base_test.go @@ -149,6 +149,7 @@ func getModuleDependencies(controller *gomock.Controller, c *moduleTestCase) fx. cfg := configs.NewConfig( dynamicconfig.NewNoopCollection(), 1, + true, false, ) archivalMetadata := getArchivalMetadata(controller, c) diff --git a/service/history/replication/stream_receiver_monitor_test.go b/service/history/replication/stream_receiver_monitor_test.go index e6a54976d85..34e0130639f 100644 --- a/service/history/replication/stream_receiver_monitor_test.go +++ b/service/history/replication/stream_receiver_monitor_test.go @@ -87,6 +87,7 @@ func (s *streamReceiverMonitorSuite) SetupTest() { Config: configs.NewConfig( dynamicconfig.NewNoopCollection(), 1, + true, false, ), ClientBean: s.clientBean, diff --git a/service/history/tests/vars.go b/service/history/tests/vars.go index 528131e9da2..9f1ea0f90b4 100644 --- a/service/history/tests/vars.go +++ b/service/history/tests/vars.go @@ -179,7 +179,7 @@ var ( func NewDynamicConfig() *configs.Config { dc := dynamicconfig.NewNoopCollection() - config := configs.NewConfig(dc, 1, false) + config := configs.NewConfig(dc, 1, true, false) // reduce the duration of long poll to increase test speed config.LongPollExpirationInterval = dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.HistoryLongPollExpirationInterval, 10*time.Second) config.EnableActivityEagerExecution = dynamicconfig.GetBoolPropertyFnFilteredByNamespace(true) diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index e6ddce94b6a..ea1c3036de4 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -62,7 +62,6 @@ import ( "go.temporal.io/server/common/payload" "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/persistence/versionhistory" - "go.temporal.io/server/common/persistence/visibility" "go.temporal.io/server/common/primitives/timestamp" "go.temporal.io/server/common/searchattribute" "go.temporal.io/server/service/history/configs" @@ -1895,10 +1894,7 @@ func (ms *MutableStateImpl) addBinaryCheckSumIfNotExists( exeInfo.SearchAttributes = make(map[string]*commonpb.Payload, 1) } exeInfo.SearchAttributes[searchattribute.BinaryChecksums] = checksumsPayload - if ms.shard.GetConfig().AdvancedVisibilityWritingMode() != visibility.SecondaryVisibilityWritingModeOff { - return ms.taskGenerator.GenerateUpsertVisibilityTask() - } - return nil + return ms.taskGenerator.GenerateUpsertVisibilityTask() } // TODO: we will release the restriction when reset API allow those pending diff --git a/service/history/workflow/task_refresher.go b/service/history/workflow/task_refresher.go index 5b9bb7b9519..43740360b04 100644 --- a/service/history/workflow/task_refresher.go +++ b/service/history/workflow/task_refresher.go @@ -36,7 +36,6 @@ import ( "go.temporal.io/server/common" "go.temporal.io/server/common/log" "go.temporal.io/server/common/namespace" - "go.temporal.io/server/common/persistence/visibility" "go.temporal.io/server/common/primitives/timestamp" "go.temporal.io/server/service/history/configs" "go.temporal.io/server/service/history/events" @@ -153,15 +152,7 @@ func (r *TaskRefresherImpl) RefreshTasks( return err } - if r.config.AdvancedVisibilityWritingMode() != visibility.SecondaryVisibilityWritingModeOff { - if err := r.refreshTasksForWorkflowSearchAttr( - taskGenerator, - ); err != nil { - return err - } - } - - return nil + return r.refreshTasksForWorkflowSearchAttr(taskGenerator) } func (r *TaskRefresherImpl) refreshTasksForWorkflowStart( diff --git a/service/worker/fx.go b/service/worker/fx.go index a1557b5755d..649a182d20c 100644 --- a/service/worker/fx.go +++ b/service/worker/fx.go @@ -88,6 +88,7 @@ func ConfigProvider( return NewConfig( dc, persistenceConfig, + persistenceConfig.StandardVisibilityConfigExist(), persistenceConfig.AdvancedVisibilityConfigExist(), ) } @@ -97,7 +98,6 @@ func VisibilityManagerProvider( metricsHandler metrics.Handler, persistenceConfig *config.Persistence, serviceConfig *Config, - esConfig *esclient.Config, esClient esclient.Client, persistenceServiceResolver resolver.ServiceResolver, searchAttributesMapperProvider searchattribute.MapperProvider, @@ -106,20 +106,14 @@ func VisibilityManagerProvider( return visibility.NewManager( *persistenceConfig, persistenceServiceResolver, - esConfig.GetVisibilityIndex(), - esConfig.GetSecondaryVisibilityIndex(), esClient, nil, // worker visibility never write saProvider, searchAttributesMapperProvider, - serviceConfig.StandardVisibilityPersistenceMaxReadQPS, - serviceConfig.StandardVisibilityPersistenceMaxWriteQPS, - serviceConfig.AdvancedVisibilityPersistenceMaxReadQPS, - serviceConfig.AdvancedVisibilityPersistenceMaxWriteQPS, - serviceConfig.EnableReadVisibilityFromES, + serviceConfig.VisibilityPersistenceMaxReadQPS, + serviceConfig.VisibilityPersistenceMaxWriteQPS, + serviceConfig.EnableReadFromSecondaryVisibility, dynamicconfig.GetStringPropertyFn(visibility.SecondaryVisibilityWritingModeOff), // worker visibility never write - serviceConfig.EnableReadFromSecondaryAdvancedVisibility, - dynamicconfig.GetBoolPropertyFn(false), // worker visibility never write serviceConfig.VisibilityDisableOrderByClause, metricsHandler, logger, diff --git a/service/worker/service.go b/service/worker/service.go index 46c7d7ab2a4..4d67c72f6bc 100644 --- a/service/worker/service.go +++ b/service/worker/service.go @@ -47,6 +47,7 @@ import ( "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence" persistenceClient "go.temporal.io/server/common/persistence/client" + "go.temporal.io/server/common/persistence/visibility" "go.temporal.io/server/common/persistence/visibility/manager" esclient "go.temporal.io/server/common/persistence/visibility/store/elasticsearch/client" "go.temporal.io/server/common/primitives" @@ -115,13 +116,10 @@ type ( PerNamespaceWorkerCount dynamicconfig.IntPropertyFnWithNamespaceFilter PerNamespaceWorkerOptions dynamicconfig.MapPropertyFnWithNamespaceFilter - StandardVisibilityPersistenceMaxReadQPS dynamicconfig.IntPropertyFn - StandardVisibilityPersistenceMaxWriteQPS dynamicconfig.IntPropertyFn - AdvancedVisibilityPersistenceMaxReadQPS dynamicconfig.IntPropertyFn - AdvancedVisibilityPersistenceMaxWriteQPS dynamicconfig.IntPropertyFn - EnableReadVisibilityFromES dynamicconfig.BoolPropertyFnWithNamespaceFilter - EnableReadFromSecondaryAdvancedVisibility dynamicconfig.BoolPropertyFnWithNamespaceFilter - VisibilityDisableOrderByClause dynamicconfig.BoolPropertyFnWithNamespaceFilter + VisibilityPersistenceMaxReadQPS dynamicconfig.IntPropertyFn + VisibilityPersistenceMaxWriteQPS dynamicconfig.IntPropertyFn + EnableReadFromSecondaryVisibility dynamicconfig.BoolPropertyFnWithNamespaceFilter + VisibilityDisableOrderByClause dynamicconfig.BoolPropertyFnWithNamespaceFilter } ) @@ -187,7 +185,12 @@ func NewService( } // NewConfig builds the new Config for worker service -func NewConfig(dc *dynamicconfig.Collection, persistenceConfig *config.Persistence, enableReadFromES bool) *Config { +func NewConfig( + dc *dynamicconfig.Collection, + persistenceConfig *config.Persistence, + visibilityStoreConfigExist bool, + enableReadFromES bool, +) *Config { config := &Config{ ArchiverConfig: &archiver.Config{ MaxConcurrentActivityExecutionSize: dc.GetIntProperty( @@ -343,13 +346,10 @@ func NewConfig(dc *dynamicconfig.Collection, persistenceConfig *config.Persisten true, ), - StandardVisibilityPersistenceMaxReadQPS: dc.GetIntProperty(dynamicconfig.StandardVisibilityPersistenceMaxReadQPS, 9000), - StandardVisibilityPersistenceMaxWriteQPS: dc.GetIntProperty(dynamicconfig.StandardVisibilityPersistenceMaxWriteQPS, 9000), - AdvancedVisibilityPersistenceMaxReadQPS: dc.GetIntProperty(dynamicconfig.AdvancedVisibilityPersistenceMaxReadQPS, 9000), - AdvancedVisibilityPersistenceMaxWriteQPS: dc.GetIntProperty(dynamicconfig.AdvancedVisibilityPersistenceMaxWriteQPS, 9000), - EnableReadVisibilityFromES: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.EnableReadVisibilityFromES, enableReadFromES), - EnableReadFromSecondaryAdvancedVisibility: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.EnableReadFromSecondaryAdvancedVisibility, false), - VisibilityDisableOrderByClause: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.VisibilityDisableOrderByClause, false), + VisibilityPersistenceMaxReadQPS: visibility.GetVisibilityPersistenceMaxReadQPS(dc, enableReadFromES), + VisibilityPersistenceMaxWriteQPS: visibility.GetVisibilityPersistenceMaxWriteQPS(dc, enableReadFromES), + EnableReadFromSecondaryVisibility: visibility.GetEnableReadFromSecondaryVisibilityConfig(dc, visibilityStoreConfigExist, enableReadFromES), + VisibilityDisableOrderByClause: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.VisibilityDisableOrderByClause, false), } return config } diff --git a/temporal/fx.go b/temporal/fx.go index 10de3e3ddb5..bc28cabb5d5 100644 --- a/temporal/fx.go +++ b/temporal/fx.go @@ -163,7 +163,8 @@ func ServerOptionsProvider(opts []ServerOption) (serverOptionsProvider, error) { return serverOptionsProvider{}, err } - err = verifyPersistenceCompatibleVersion(so.config.Persistence, so.persistenceServiceResolver) + persistenceConfig := so.config.Persistence + err = verifyPersistenceCompatibleVersion(persistenceConfig, so.persistenceServiceResolver) if err != nil { return serverOptionsProvider{}, err } @@ -216,23 +217,27 @@ func ServerOptionsProvider(opts []ServerOption) (serverOptionsProvider, error) { // EsConfig / EsClient var esConfig *esclient.Config var esClient esclient.Client - if so.config.Persistence.AdvancedVisibilityConfigExist() { - advancedVisibilityStore, ok := so.config.Persistence.DataStores[so.config.Persistence.AdvancedVisibilityStore] - if !ok { - return serverOptionsProvider{}, fmt.Errorf("persistence config: advanced visibility datastore %q: missing config", so.config.Persistence.AdvancedVisibilityStore) - } + if persistenceConfig.StandardVisibilityConfigExist() && + persistenceConfig.DataStores[persistenceConfig.VisibilityStore].Elasticsearch != nil { + esConfig = persistenceConfig.DataStores[persistenceConfig.VisibilityStore].Elasticsearch + } else if persistenceConfig.SecondaryVisibilityConfigExist() && + persistenceConfig.DataStores[persistenceConfig.SecondaryVisibilityStore].Elasticsearch != nil { + esConfig = persistenceConfig.DataStores[persistenceConfig.SecondaryVisibilityStore].Elasticsearch + } else if persistenceConfig.AdvancedVisibilityConfigExist() { + esConfig = persistenceConfig.DataStores[persistenceConfig.AdvancedVisibilityStore].Elasticsearch + } + + if esConfig != nil { esHttpClient := so.elasticsearchHttpClient if esHttpClient == nil { var err error - esHttpClient, err = esclient.NewAwsHttpClient(advancedVisibilityStore.Elasticsearch.AWSRequestSigning) + esHttpClient, err = esclient.NewAwsHttpClient(esConfig.AWSRequestSigning) if err != nil { return serverOptionsProvider{}, fmt.Errorf("unable to create AWS HTTP client for Elasticsearch: %w", err) } } - esConfig = advancedVisibilityStore.Elasticsearch - esClient, err = esclient.NewClient(esConfig, esHttpClient, logger) if err != nil { return serverOptionsProvider{}, fmt.Errorf("unable to create Elasticsearch client: %w", err) @@ -638,13 +643,17 @@ func ApplyClusterMetadataConfigProvider( } defer clusterMetadataManager.Close() - var indexName string - var initialIndexSearchAttributes map[string]*persistencespb.IndexSearchAttributes - if config.Persistence.IsSQLVisibilityStore() { - indexName = config.Persistence.DataStores[config.Persistence.VisibilityStore].SQL.DatabaseName - initialIndexSearchAttributes = map[string]*persistencespb.IndexSearchAttributes{ - indexName: searchattribute.GetSqlDbIndexSearchAttributes(), - } + var sqlIndexNames []string + initialIndexSearchAttributes := make(map[string]*persistencespb.IndexSearchAttributes) + if ds := config.Persistence.GetVisibilityStoreConfig(); ds.SQL != nil { + indexName := ds.GetIndexName() + sqlIndexNames = append(sqlIndexNames, indexName) + initialIndexSearchAttributes[indexName] = searchattribute.GetSqlDbIndexSearchAttributes() + } + if ds := config.Persistence.GetSecondaryVisibilityStoreConfig(); ds.SQL != nil { + indexName := ds.GetIndexName() + sqlIndexNames = append(sqlIndexNames, indexName) + initialIndexSearchAttributes[indexName] = searchattribute.GetSqlDbIndexSearchAttributes() } clusterData := config.ClusterMetadata @@ -706,14 +715,18 @@ func ApplyClusterMetadataConfigProvider( // TODO (rodrigozhou): Remove this block for v1.21. // Handle registering custom search attributes when upgrading to v1.20. - if config.Persistence.IsSQLVisibilityStore() { + if len(sqlIndexNames) > 0 { needSave := false if currentMetadata.IndexSearchAttributes == nil { currentMetadata.IndexSearchAttributes = initialIndexSearchAttributes needSave = true - } else if _, ok := currentMetadata.IndexSearchAttributes[indexName]; !ok { - currentMetadata.IndexSearchAttributes[indexName] = searchattribute.GetSqlDbIndexSearchAttributes() - needSave = true + } else { + for _, indexName := range sqlIndexNames { + if _, ok := currentMetadata.IndexSearchAttributes[indexName]; !ok { + currentMetadata.IndexSearchAttributes[indexName] = searchattribute.GetSqlDbIndexSearchAttributes() + needSave = true + } + } } if needSave {