From a3e599c617f81e5d8cebe567ae7cc87e9d8faddf Mon Sep 17 00:00:00 2001 From: rodrigozhou Date: Thu, 23 Mar 2023 14:54:54 -0800 Subject: [PATCH] New dynamic configs for visibility --- common/config/config.go | 2 + common/config/persistence.go | 156 ++++++++- common/dynamicconfig/collection.go | 5 + common/dynamicconfig/constants.go | 10 + .../visibility_persistence_suite_test.go | 7 +- common/persistence/visibility/defs.go | 82 +++++ common/persistence/visibility/factory.go | 297 ++++++++---------- .../visibility/manager_selector.go | 77 ++--- .../store/elasticsearch/client/config.go | 11 +- service/frontend/dcRedirectionPolicy_test.go | 2 +- service/frontend/fx.go | 14 +- .../frontend/redirection_interceptor_test.go | 1 + service/frontend/service.go | 38 +-- service/frontend/workflow_handler.go | 5 +- service/frontend/workflow_handler_test.go | 26 +- service/history/configs/config.go | 36 +-- service/history/fx.go | 19 +- service/history/queue_factory_base_test.go | 1 + service/history/tests/vars.go | 2 +- .../history/workflow/mutable_state_impl.go | 6 +- service/history/workflow/task_refresher.go | 11 +- service/worker/fx.go | 14 +- service/worker/service.go | 30 +- temporal/fx.go | 53 ++-- 24 files changed, 523 insertions(+), 382 deletions(-) diff --git a/common/config/config.go b/common/config/config.go index c64859904178..7183ab8913bb 100644 --- a/common/config/config.go +++ b/common/config/config.go @@ -230,6 +230,8 @@ type ( DefaultStore string `yaml:"defaultStore" validate:"nonzero"` // VisibilityStore is the name of the datastore to be used for visibility records VisibilityStore string `yaml:"visibilityStore"` + // SecondaryVisibilityStore is the name of the secondary datastore to be used for visibility records + SecondaryVisibilityStore string `yaml:"secondaryVisibilityStore"` // AdvancedVisibilityStore is the name of the datastore to be used for visibility records AdvancedVisibilityStore string `yaml:"advancedVisibilityStore"` // NumHistoryShards is the desired number of history shards. This config doesn't diff --git a/common/config/persistence.go b/common/config/persistence.go index 31d07905dcd2..cc1f7da17d05 100644 --- a/common/config/persistence.go +++ b/common/config/persistence.go @@ -31,6 +31,7 @@ import ( "strings" "github.com/gocql/gocql" + "go.temporal.io/server/common/persistence/visibility/store/elasticsearch/client" ) const ( @@ -54,6 +55,68 @@ func (c *Persistence) Validate() error { if c.VisibilityStore != "" { stores = append(stores, c.VisibilityStore) } + if c.SecondaryVisibilityStore != "" { + stores = append(stores, c.SecondaryVisibilityStore) + } + if c.AdvancedVisibilityStore != "" { + stores = append(stores, c.AdvancedVisibilityStore) + } + + // There are 3 config keys: + // - visibilityStore: can set any data store + // - secondaryVisibilityStore: can set any data store + // - advancedVisibilityStore: can only set elasticsearch data store + // If visibilityStore is set, then it's always the primary. + // If secondaryVisibilityStore is set, it's always the secondary. + // + // Valid dual visibility combinations (order: primary, secondary): + // - visibilityStore (standard), secondaryVisibilityStore (any) + // - visibilityStore (standard), advancedVisibilityStore (es) + // - visibilityStore (advanced sql), secondaryVisibilityStore (advanced sql) + // - visibilityStore (es), visibilityStore (es) [via elasticsearch.indices config] + // - advancedVisibilityStore (es), advancedVisibilityStore (es) [via elasticsearch.indices config] + // + // Invalid dual visibility combinations: + // - visibilityStore (advanced sql), secondaryVisibilityStore (standard, es) + // - visibilityStore (advanced sql), advancedVisibilityStore (es) + // - visibilityStore (es), secondaryVisibilityStore (any) + // - visibilityStore (es), advancedVisibilityStore (es) + // - advancedVisibilityStore (es), secondaryVisibilityStore (any) + // + // The validation for dual visibility pair (advanced sql, advanced sql) is in visibility factory + // due to circular dependency. This will be better after standard visibility is removed. + + if c.VisibilityStore == "" && c.AdvancedVisibilityStore == "" { + return errors.New("persistence config: visibilityStore must be specified") + } + if c.SecondaryVisibilityStore != "" && c.AdvancedVisibilityStore != "" { + return errors.New( + "persistence config: cannot specify both secondaryVisibilityStore and " + + "advancedVisibilityStore", + ) + } + if c.AdvancedVisibilityStore != "" && c.DataStores[c.AdvancedVisibilityStore].Elasticsearch == nil { + return fmt.Errorf( + "persistence config: advanced visibility datastore %q: missing elasticsearch config", + c.AdvancedVisibilityStore, + ) + } + if c.DataStores[c.VisibilityStore].Elasticsearch != nil && + (c.SecondaryVisibilityStore != "" || c.AdvancedVisibilityStore != "") { + return errors.New( + "persistence config: cannot set secondaryVisibilityStore or advancedVisibilityStore " + + "when visibilityStore is setting elasticsearch datastore", + ) + } + if c.DataStores[c.SecondaryVisibilityStore].Elasticsearch.GetSecondaryVisibilityIndex() != "" { + return fmt.Errorf( + "persistence config: secondary visibility datastore %q: elasticsearch config: "+ + "cannot set secondary_visibility", + c.SecondaryVisibilityStore, + ) + } + + cntEsConfigs := 0 for _, st := range stores { ds, ok := c.DataStores[st] if !ok { @@ -62,10 +125,17 @@ func (c *Persistence) Validate() error { if err := ds.Validate(); err != nil { return fmt.Errorf("persistence config: datastore %q: %s", st, err.Error()) } + if ds.Elasticsearch != nil { + cntEsConfigs++ + } } - if err := c.validateAdvancedVisibility(); err != nil { - return err + if cntEsConfigs > 1 { + return fmt.Errorf( + "persistence config: cannot have more than one Elasticsearch visibility store config " + + "(use `elasticsearch.indices.secondary_visibility` config key if you need to set a " + + "secondary Elasticsearch visibility store)", + ) } return nil @@ -76,34 +146,75 @@ func (c *Persistence) StandardVisibilityConfigExist() bool { return c.VisibilityStore != "" } +// SecondaryVisibilityConfigExist returns whether user specified secondaryVisibilityStore in config +func (c *Persistence) SecondaryVisibilityConfigExist() bool { + return c.SecondaryVisibilityStore != "" +} + // AdvancedVisibilityConfigExist returns whether user specified advancedVisibilityStore in config func (c *Persistence) AdvancedVisibilityConfigExist() bool { return c.AdvancedVisibilityStore != "" } func (c *Persistence) IsSQLVisibilityStore() bool { - return c.StandardVisibilityConfigExist() && c.DataStores[c.VisibilityStore].SQL != nil + return (c.StandardVisibilityConfigExist() && c.DataStores[c.VisibilityStore].SQL != nil) || + (c.SecondaryVisibilityConfigExist() && c.DataStores[c.SecondaryVisibilityStore].SQL != nil) } -func (c *Persistence) validateAdvancedVisibility() error { - if !c.StandardVisibilityConfigExist() && !c.AdvancedVisibilityConfigExist() { - return errors.New("persistence config: one of visibilityStore or advancedVisibilityStore must be specified") +func (c *Persistence) GetVisibilityStoreConfig() DataStore { + if c.VisibilityStore != "" { + return c.DataStores[c.VisibilityStore] } - - if !c.AdvancedVisibilityConfigExist() { - return nil + if c.AdvancedVisibilityStore != "" { + return c.DataStores[c.AdvancedVisibilityStore] } + // Based on validation above, this should never happen. + return DataStore{} +} - advancedVisibilityDataStore, ok := c.DataStores[c.AdvancedVisibilityStore] - if !ok { - return fmt.Errorf("persistence config: advanced visibility datastore %q: missing config", c.AdvancedVisibilityStore) +func (c *Persistence) GetSecondaryVisibilityStoreConfig() DataStore { + if c.SecondaryVisibilityStore != "" { + return c.DataStores[c.SecondaryVisibilityStore] } - - if err := advancedVisibilityDataStore.Elasticsearch.Validate(c.AdvancedVisibilityStore); err != nil { - return err + if c.VisibilityStore != "" { + if c.AdvancedVisibilityStore != "" { + return c.DataStores[c.AdvancedVisibilityStore] + } + ds := c.DataStores[c.VisibilityStore] + if ds.Elasticsearch != nil && ds.Elasticsearch.GetSecondaryVisibilityIndex() != "" { + esConfig := *ds.Elasticsearch + esConfig.Indices = map[string]string{ + client.VisibilityAppName: ds.Elasticsearch.GetSecondaryVisibilityIndex(), + } + ds.Elasticsearch = &esConfig + return ds + } + } + if c.AdvancedVisibilityStore != "" { + ds := c.DataStores[c.AdvancedVisibilityStore] + if ds.Elasticsearch != nil && ds.Elasticsearch.GetSecondaryVisibilityIndex() != "" { + esConfig := *ds.Elasticsearch + esConfig.Indices = map[string]string{ + client.VisibilityAppName: ds.Elasticsearch.GetSecondaryVisibilityIndex(), + } + ds.Elasticsearch = &esConfig + return ds + } } + return DataStore{} +} - return nil +func (ds *DataStore) GetIndexName() string { + switch { + case ds.SQL != nil: + return ds.SQL.DatabaseName + case ds.Cassandra != nil: + return ds.Cassandra.Keyspace + case ds.Elasticsearch != nil: + return ds.Elasticsearch.GetVisibilityIndex() + default: + return "" + } } // Validate validates the data store config @@ -118,8 +229,14 @@ func (ds *DataStore) Validate() error { if ds.CustomDataStoreConfig != nil { storeConfigCount++ } + if ds.Elasticsearch != nil { + storeConfigCount++ + } if storeConfigCount != 1 { - return errors.New("must provide config for one and only one for DataStore of cassandra or sql or custom stores") + return errors.New( + "must provide config for one and only one DataStore: " + + "elasticsearch, cassandra, sql or custom stores", + ) } if ds.SQL != nil && ds.SQL.TaskScanPartitions == 0 { @@ -130,6 +247,11 @@ func (ds *DataStore) Validate() error { return err } } + if ds.Elasticsearch != nil { + if err := ds.Elasticsearch.Validate(); err != nil { + return err + } + } return nil } diff --git a/common/dynamicconfig/collection.go b/common/dynamicconfig/collection.go index 3e8d18849a55..b1b8a564c757 100644 --- a/common/dynamicconfig/collection.go +++ b/common/dynamicconfig/collection.go @@ -406,6 +406,11 @@ func (c *Collection) GetTaskQueuePartitionsProperty(key Key) IntPropertyFnWithTa return c.GetIntPropertyFilteredByTaskQueueInfo(key, defaultNumTaskQueuePartitions) } +func (c *Collection) HasKey(key Key) bool { + cvs := c.client.GetValue(key) + return len(cvs) > 0 +} + func findMatch(cvs, defaultCVs []ConstrainedValue, precedence []Constraints) (any, error) { if len(cvs)+len(defaultCVs) == 0 { return nil, errKeyNotPresent diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index a602eb515a32..d5a6949ac946 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -38,6 +38,7 @@ const ( // keys for system + // DEPRECATED: the following block of configs are deprecated and replaced by the next block of configs // StandardVisibilityPersistenceMaxReadQPS is the max QPC system host can query standard visibility DB (SQL or Cassandra) for read. StandardVisibilityPersistenceMaxReadQPS = "system.standardVisibilityPersistenceMaxReadQPS" // StandardVisibilityPersistenceMaxWriteQPS is the max QPC system host can query standard visibility DB (SQL or Cassandra) for write. @@ -54,6 +55,15 @@ const ( EnableReadVisibilityFromES = "system.enableReadVisibilityFromES" // EnableReadFromSecondaryAdvancedVisibility is the config to enable read from secondary Elasticsearch EnableReadFromSecondaryAdvancedVisibility = "system.enableReadFromSecondaryAdvancedVisibility" + + // VisibilityPersistenceMaxReadQPS is the max QPC system host can query visibility DB for read. + VisibilityPersistenceMaxReadQPS = "system.visibilityPersistenceMaxReadQPS" + // VisibilityPersistenceMaxWriteQPS is the max QPC system host can query visibility DB for write. + VisibilityPersistenceMaxWriteQPS = "system.visibilityPersistenceMaxWriteQPS" + // EnableReadFromSecondaryVisibility is the config to enable read from secondary visibility + EnableReadFromSecondaryVisibility = "system.enableReadFromSecondaryVisibility" + // SecondaryVisibilityWritingMode is key for how to write to secondary visibility + SecondaryVisibilityWritingMode = "system.secondaryVisibilityWritingMode" // VisibilityDisableOrderByClause is the config to disable ORDERY BY clause for Elasticsearch VisibilityDisableOrderByClause = "system.visibilityDisableOrderByClause" diff --git a/common/persistence/tests/visibility_persistence_suite_test.go b/common/persistence/tests/visibility_persistence_suite_test.go index 25a340a26626..f80c24becfc0 100644 --- a/common/persistence/tests/visibility_persistence_suite_test.go +++ b/common/persistence/tests/visibility_persistence_suite_test.go @@ -78,13 +78,18 @@ func (s *VisibilityPersistenceSuite) SetupSuite() { s.controller = gomock.NewController(s.T()) s.SearchAttributesProvider = searchattribute.NewTestProvider() s.SearchAttributesMapperProvider = searchattribute.NewTestMapperProvider(nil) - s.VisibilityMgr, err = visibility.NewStandardManager( + s.VisibilityMgr, err = visibility.NewManager( cfg, resolver.NewNoopResolver(), + nil, + nil, s.SearchAttributesProvider, s.SearchAttributesMapperProvider, dynamicconfig.GetIntPropertyFn(1000), dynamicconfig.GetIntPropertyFn(1000), + dynamicconfig.GetBoolPropertyFnFilteredByNamespace(false), + dynamicconfig.GetStringPropertyFn(visibility.SecondaryVisibilityWritingModeOff), + dynamicconfig.GetBoolPropertyFnFilteredByNamespace(false), metrics.NoopMetricsHandler, s.Logger, ) diff --git a/common/persistence/visibility/defs.go b/common/persistence/visibility/defs.go index 5d491de68ecb..00f420a10c9b 100644 --- a/common/persistence/visibility/defs.go +++ b/common/persistence/visibility/defs.go @@ -25,6 +25,7 @@ package visibility import ( + "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/persistence/sql/sqlplugin/mysql" "go.temporal.io/server/common/persistence/sql/sqlplugin/postgresql" "go.temporal.io/server/common/persistence/sql/sqlplugin/sqlite" @@ -47,6 +48,87 @@ func DefaultAdvancedVisibilityWritingMode(advancedVisibilityConfigExist bool) st return SecondaryVisibilityWritingModeOff } +func ResolveVisibilityPersistenceMaxReadQPS( + dc *dynamicconfig.Collection, + advancedVisibilityStoreConfigExists bool, +) dynamicconfig.IntPropertyFn { + if dc.HasKey(dynamicconfig.VisibilityPersistenceMaxReadQPS) { + return dc.GetIntProperty(dynamicconfig.VisibilityPersistenceMaxReadQPS, 9000) + } + if advancedVisibilityStoreConfigExists { + return dc.GetIntProperty(dynamicconfig.AdvancedVisibilityPersistenceMaxReadQPS, 9000) + } + return dc.GetIntProperty(dynamicconfig.StandardVisibilityPersistenceMaxReadQPS, 9000) +} + +func ResolveVisibilityPersistenceMaxWriteQPS( + dc *dynamicconfig.Collection, + advancedVisibilityStoreConfigExists bool, +) dynamicconfig.IntPropertyFn { + if dc.HasKey(dynamicconfig.VisibilityPersistenceMaxWriteQPS) { + return dc.GetIntProperty(dynamicconfig.VisibilityPersistenceMaxWriteQPS, 9000) + } + if advancedVisibilityStoreConfigExists { + return dc.GetIntProperty(dynamicconfig.AdvancedVisibilityPersistenceMaxWriteQPS, 9000) + } + return dc.GetIntProperty(dynamicconfig.StandardVisibilityPersistenceMaxWriteQPS, 9000) +} + +func ResolveEnableReadFromSecondaryVisibilityConfig( + dc *dynamicconfig.Collection, + visibilityStoreConfigExists bool, + advancedVisibilityStoreConfigExists bool, +) dynamicconfig.BoolPropertyFnWithNamespaceFilter { + if dc.HasKey(dynamicconfig.EnableReadFromSecondaryVisibility) { + return dc.GetBoolPropertyFnWithNamespaceFilter( + dynamicconfig.EnableReadFromSecondaryVisibility, + false, + ) + } + if visibilityStoreConfigExists && advancedVisibilityStoreConfigExists { + return dc.GetBoolPropertyFnWithNamespaceFilter( + dynamicconfig.EnableReadVisibilityFromES, + advancedVisibilityStoreConfigExists, + ) + } + if advancedVisibilityStoreConfigExists { + return dc.GetBoolPropertyFnWithNamespaceFilter( + dynamicconfig.EnableReadFromSecondaryAdvancedVisibility, + false, + ) + } + return dynamicconfig.GetBoolPropertyFnFilteredByNamespace(false) +} + +func ResolveSecondaryVisibilityWritingModeConfig( + dc *dynamicconfig.Collection, + visibilityStoreConfigExists bool, + advancedVisibilityStoreConfigExists bool, +) dynamicconfig.StringPropertyFn { + if dc.HasKey(dynamicconfig.SecondaryVisibilityWritingMode) { + return dc.GetStringProperty( + dynamicconfig.SecondaryVisibilityWritingMode, + SecondaryVisibilityWritingModeOff, + ) + } + if visibilityStoreConfigExists && advancedVisibilityStoreConfigExists { + return dc.GetStringProperty( + dynamicconfig.AdvancedVisibilityWritingMode, + DefaultAdvancedVisibilityWritingMode(advancedVisibilityStoreConfigExists), + ) + } + if advancedVisibilityStoreConfigExists { + enableWriteToSecondaryAdvancedVisibility := dc.GetBoolProperty( + dynamicconfig.EnableWriteToSecondaryAdvancedVisibility, + false, + ) + if enableWriteToSecondaryAdvancedVisibility() { + return dynamicconfig.GetStringPropertyFn(SecondaryVisibilityWritingModeDual) + } + } + return dynamicconfig.GetStringPropertyFn(SecondaryVisibilityWritingModeOff) +} + func AllowListForValidation(storeNames []string) bool { if len(storeNames) == 0 { return false diff --git a/common/persistence/visibility/factory.go b/common/persistence/visibility/factory.go index e2daee35e061..7647063016fb 100644 --- a/common/persistence/visibility/factory.go +++ b/common/persistence/visibility/factory.go @@ -48,48 +48,29 @@ func NewManager( persistenceCfg config.Persistence, persistenceResolver resolver.ServiceResolver, - defaultIndexName string, - secondaryVisibilityIndexName string, esClient esclient.Client, esProcessorConfig *elasticsearch.ProcessorConfig, searchAttributesProvider searchattribute.Provider, searchAttributesMapperProvider searchattribute.MapperProvider, - standardVisibilityPersistenceMaxReadQPS dynamicconfig.IntPropertyFn, - standardVisibilityPersistenceMaxWriteQPS dynamicconfig.IntPropertyFn, - advancedVisibilityPersistenceMaxReadQPS dynamicconfig.IntPropertyFn, - advancedVisibilityPersistenceMaxWriteQPS dynamicconfig.IntPropertyFn, - enableAdvancedVisibilityRead dynamicconfig.BoolPropertyFnWithNamespaceFilter, - advancedVisibilityWritingMode dynamicconfig.StringPropertyFn, - enableReadFromSecondaryAdvancedVisibility dynamicconfig.BoolPropertyFnWithNamespaceFilter, - enableWriteToSecondaryAdvancedVisibility dynamicconfig.BoolPropertyFn, + maxReadQPS dynamicconfig.IntPropertyFn, + maxWriteQPS dynamicconfig.IntPropertyFn, + enableReadFromSecondaryVisibility dynamicconfig.BoolPropertyFnWithNamespaceFilter, + secondaryVisibilityWritingMode dynamicconfig.StringPropertyFn, visibilityDisableOrderByClause dynamicconfig.BoolPropertyFnWithNamespaceFilter, metricsHandler metrics.Handler, logger log.Logger, ) (manager.VisibilityManager, error) { - stdVisibilityManager, err := NewStandardManager( - persistenceCfg, + visibilityManager, err := newVisibilityManagerFromDataStoreConfig( + persistenceCfg.GetVisibilityStoreConfig(), persistenceResolver, - searchAttributesProvider, - searchAttributesMapperProvider, - standardVisibilityPersistenceMaxReadQPS, - standardVisibilityPersistenceMaxWriteQPS, - metricsHandler, - logger, - ) - if err != nil { - return nil, err - } - - advVisibilityManager, err := NewAdvancedManager( - defaultIndexName, esClient, esProcessorConfig, searchAttributesProvider, searchAttributesMapperProvider, - advancedVisibilityPersistenceMaxReadQPS, - advancedVisibilityPersistenceMaxWriteQPS, + maxReadQPS, + maxWriteQPS, visibilityDisableOrderByClause, metricsHandler, logger, @@ -97,15 +78,20 @@ func NewManager( if err != nil { return nil, err } + if visibilityManager == nil { + logger.Fatal("invalid config: visibility store must be configured") + return nil, nil + } - secondaryVisibilityManager, err := NewAdvancedManager( - secondaryVisibilityIndexName, + secondaryVisibilityManager, err := newVisibilityManagerFromDataStoreConfig( + persistenceCfg.GetSecondaryVisibilityStoreConfig(), + persistenceResolver, esClient, esProcessorConfig, searchAttributesProvider, searchAttributesMapperProvider, - advancedVisibilityPersistenceMaxReadQPS, - advancedVisibilityPersistenceMaxWriteQPS, + maxReadQPS, + maxWriteQPS, visibilityDisableOrderByClause, metricsHandler, logger, @@ -114,120 +100,36 @@ func NewManager( return nil, err } - if stdVisibilityManager == nil && advVisibilityManager == nil { - logger.Fatal("invalid config: one of standard or advanced visibility must be configured") - return nil, nil - } - - if stdVisibilityManager != nil && secondaryVisibilityManager != nil { - logger.Fatal("invalid config: secondary visibility store cannot be used with standard visibility") - return nil, nil - } - - if stdVisibilityManager != nil && advVisibilityManager == nil { - return stdVisibilityManager, nil - } - - if stdVisibilityManager == nil && advVisibilityManager != nil { - if secondaryVisibilityManager == nil { - return advVisibilityManager, nil + if secondaryVisibilityManager != nil { + isPrimaryAdvancedSQL := false + isSecondaryAdvancedSQL := false + switch visibilityManager.GetStoreNames()[0] { + case mysql.PluginNameV8, postgresql.PluginNameV12, sqlite.PluginName: + isPrimaryAdvancedSQL = true + } + switch secondaryVisibilityManager.GetStoreNames()[0] { + case mysql.PluginNameV8, postgresql.PluginNameV12, sqlite.PluginName: + isSecondaryAdvancedSQL = true + } + if isPrimaryAdvancedSQL && !isSecondaryAdvancedSQL { + logger.Fatal("invalid config: dual visibility combination not supported") + return nil, nil } - // Dual write to primary and secondary ES indices. - managerSelector := NewESManagerSelector( - advVisibilityManager, + managerSelector := NewDefaultManagerSelector( + visibilityManager, secondaryVisibilityManager, - enableReadFromSecondaryAdvancedVisibility, - enableWriteToSecondaryAdvancedVisibility) - + enableReadFromSecondaryVisibility, + secondaryVisibilityWritingMode, + ) return NewVisibilityManagerDual( - advVisibilityManager, + visibilityManager, secondaryVisibilityManager, managerSelector, ), nil } - // Dual write to standard and advanced visibility. - managerSelector := NewSQLToESManagerSelector( - stdVisibilityManager, - advVisibilityManager, - enableAdvancedVisibilityRead, - advancedVisibilityWritingMode) - return NewVisibilityManagerDual( - stdVisibilityManager, - advVisibilityManager, - managerSelector, - ), nil -} - -func NewStandardManager( - persistenceCfg config.Persistence, - persistenceResolver resolver.ServiceResolver, - searchAttributesProvider searchattribute.Provider, - searchAttributesMapperProvider searchattribute.MapperProvider, - - standardVisibilityPersistenceMaxReadQPS dynamicconfig.IntPropertyFn, - standardVisibilityPersistenceMaxWriteQPS dynamicconfig.IntPropertyFn, - - metricsHandler metrics.Handler, - logger log.Logger, -) (manager.VisibilityManager, error) { - - stdVisibilityStore, err := newStandardVisibilityStore( - persistenceCfg, - persistenceResolver, - searchAttributesProvider, - searchAttributesMapperProvider, - logger) - if err != nil { - return nil, err - } - - return newVisibilityManager( - stdVisibilityStore, - standardVisibilityPersistenceMaxReadQPS, - standardVisibilityPersistenceMaxWriteQPS, - metricsHandler, - metrics.StandardVisibilityTypeTag(), - logger), nil -} - -func NewAdvancedManager( - defaultIndexName string, - esClient esclient.Client, - esProcessorConfig *elasticsearch.ProcessorConfig, - searchAttributesProvider searchattribute.Provider, - searchAttributesMapperProvider searchattribute.MapperProvider, - - advancedVisibilityPersistenceMaxReadQPS dynamicconfig.IntPropertyFn, - advancedVisibilityPersistenceMaxWriteQPS dynamicconfig.IntPropertyFn, - visibilityDisableOrderByClause dynamicconfig.BoolPropertyFnWithNamespaceFilter, - - metricsHandler metrics.Handler, - logger log.Logger, -) (manager.VisibilityManager, error) { - if defaultIndexName == "" { - return nil, nil - } - - advVisibilityStore := newAdvancedVisibilityStore( - defaultIndexName, - esClient, - esProcessorConfig, - searchAttributesProvider, - searchAttributesMapperProvider, - visibilityDisableOrderByClause, - metricsHandler, - logger) - - return newVisibilityManager( - advVisibilityStore, - advancedVisibilityPersistenceMaxReadQPS, - advancedVisibilityPersistenceMaxWriteQPS, - metricsHandler, - metrics.AdvancedVisibilityTypeTag(), - logger, - ), nil + return visibilityManager, nil } func newVisibilityManager( @@ -259,62 +161,129 @@ func newVisibilityManager( return manager } -func newStandardVisibilityStore( - persistenceCfg config.Persistence, +func newVisibilityManagerFromDataStoreConfig( + dsConfig config.DataStore, persistenceResolver resolver.ServiceResolver, + + esClient esclient.Client, + esProcessorConfig *elasticsearch.ProcessorConfig, searchAttributesProvider searchattribute.Provider, searchAttributesMapperProvider searchattribute.MapperProvider, + + maxReadQPS dynamicconfig.IntPropertyFn, + maxWriteQPS dynamicconfig.IntPropertyFn, + visibilityDisableOrderByClause dynamicconfig.BoolPropertyFnWithNamespaceFilter, + + metricsHandler metrics.Handler, logger log.Logger, -) (store.VisibilityStore, error) { - // If standard visibility is not configured. - if persistenceCfg.VisibilityStore == "" { +) (manager.VisibilityManager, error) { + store, err := newVisibilityStoreFromDataStoreConfig( + dsConfig, + persistenceResolver, + esClient, + esProcessorConfig, + searchAttributesProvider, + searchAttributesMapperProvider, + visibilityDisableOrderByClause, + metricsHandler, + logger, + ) + if err != nil { + return nil, err + } + if store == nil { return nil, nil } + return newVisibilityManager( + store, + maxReadQPS, + maxWriteQPS, + metricsHandler, + metrics.AdvancedVisibilityTypeTag(), + logger, + ), nil +} + +func newVisibilityStoreFromDataStoreConfig( + dsConfig config.DataStore, + persistenceResolver resolver.ServiceResolver, - visibilityStoreCfg := persistenceCfg.DataStores[persistenceCfg.VisibilityStore] + esClient esclient.Client, + esProcessorConfig *elasticsearch.ProcessorConfig, + searchAttributesProvider searchattribute.Provider, + searchAttributesMapperProvider searchattribute.MapperProvider, + visibilityDisableOrderByClause dynamicconfig.BoolPropertyFnWithNamespaceFilter, + metricsHandler metrics.Handler, + logger log.Logger, +) (store.VisibilityStore, error) { var ( - visStore store.VisibilityStore - isStandard bool - err error + store store.VisibilityStore + err error ) - switch { - case visibilityStoreCfg.Cassandra != nil: - visStore, err = cassandra.NewVisibilityStore(*visibilityStoreCfg.Cassandra, persistenceResolver, logger) - isStandard = true - case visibilityStoreCfg.SQL != nil: - switch visibilityStoreCfg.SQL.PluginName { + if dsConfig.SQL != nil { + switch dsConfig.SQL.PluginName { case mysql.PluginNameV8, postgresql.PluginNameV12, sqlite.PluginName: - isStandard = false - visStore, err = sql.NewSQLVisibilityStore( - *visibilityStoreCfg.SQL, + store, err = sql.NewSQLVisibilityStore( + *dsConfig.SQL, persistenceResolver, searchAttributesProvider, searchAttributesMapperProvider, logger, ) default: - isStandard = true - visStore, err = standardSql.NewSQLVisibilityStore(*visibilityStoreCfg.SQL, persistenceResolver, logger) + store, err = newStandardVisibilityStore(dsConfig, persistenceResolver, logger) } + } else if dsConfig.Cassandra != nil { + store, err = newStandardVisibilityStore(dsConfig, persistenceResolver, logger) + } else if dsConfig.Elasticsearch != nil { + store = newElasticsearchVisibilityStore( + dsConfig.Elasticsearch.GetVisibilityIndex(), + esClient, + esProcessorConfig, + searchAttributesProvider, + searchAttributesMapperProvider, + visibilityDisableOrderByClause, + metricsHandler, + logger, + ) } + return store, err +} +func newStandardVisibilityStore( + dsConfig config.DataStore, + persistenceResolver resolver.ServiceResolver, + logger log.Logger, +) (store.VisibilityStore, error) { + var ( + store store.VisibilityStore + err error + ) + if dsConfig.Cassandra != nil { + store, err = cassandra.NewVisibilityStore( + *dsConfig.Cassandra, + persistenceResolver, + logger, + ) + } else if dsConfig.SQL != nil { + store, err = standardSql.NewSQLVisibilityStore( + *dsConfig.SQL, + persistenceResolver, + logger, + ) + } if err != nil { return nil, err } - - if visStore == nil { + if store == nil { logger.Fatal("invalid config: one of cassandra or sql params must be specified for visibility store") return nil, nil } - - if isStandard { - return standard.NewVisibilityStore(visStore), nil - } - return visStore, nil + return standard.NewVisibilityStore(store), nil } -func newAdvancedVisibilityStore( +func newElasticsearchVisibilityStore( defaultIndexName string, esClient esclient.Client, esProcessorConfig *elasticsearch.ProcessorConfig, diff --git a/common/persistence/visibility/manager_selector.go b/common/persistence/visibility/manager_selector.go index c917e3d7aaf3..9470df7555dd 100644 --- a/common/persistence/visibility/manager_selector.go +++ b/common/persistence/visibility/manager_selector.go @@ -40,82 +40,47 @@ type ( writeManagers() ([]manager.VisibilityManager, error) } - sqlToESManagerSelector struct { - enableAdvancedVisibilityRead dynamicconfig.BoolPropertyFnWithNamespaceFilter - advancedVisibilityWritingMode dynamicconfig.StringPropertyFn - stdVisibilityManager manager.VisibilityManager - advVisibilityManager manager.VisibilityManager - } - - esManagerSelector struct { - enableReadFromSecondaryVisibility dynamicconfig.BoolPropertyFnWithNamespaceFilter - enableWriteToSecondaryVisibility dynamicconfig.BoolPropertyFn + defaultManagerSelector struct { visibilityManager manager.VisibilityManager secondaryVisibilityManager manager.VisibilityManager + enableReadFromSecondaryVisibility dynamicconfig.BoolPropertyFnWithNamespaceFilter + secondaryVisibilityWritingMode dynamicconfig.StringPropertyFn } ) -var _ managerSelector = (*sqlToESManagerSelector)(nil) -var _ managerSelector = (*esManagerSelector)(nil) - -func NewSQLToESManagerSelector( - stdVisibilityManager manager.VisibilityManager, - advVisibilityManager manager.VisibilityManager, - enableAdvancedVisibilityRead dynamicconfig.BoolPropertyFnWithNamespaceFilter, - advancedVisibilityWritingMode dynamicconfig.StringPropertyFn, -) *sqlToESManagerSelector { - return &sqlToESManagerSelector{ - stdVisibilityManager: stdVisibilityManager, - advVisibilityManager: advVisibilityManager, - enableAdvancedVisibilityRead: enableAdvancedVisibilityRead, - advancedVisibilityWritingMode: advancedVisibilityWritingMode, - } -} +var _ managerSelector = (*defaultManagerSelector)(nil) -func NewESManagerSelector( +func NewDefaultManagerSelector( visibilityManager manager.VisibilityManager, secondaryVisibilityManager manager.VisibilityManager, - enableReadFromSecondaryVisibility dynamicconfig.BoolPropertyFnWithNamespaceFilter, - enableWriteToSecondaryVisibility dynamicconfig.BoolPropertyFn, -) *esManagerSelector { - return &esManagerSelector{ + enableSecondaryVisibilityRead dynamicconfig.BoolPropertyFnWithNamespaceFilter, + secondaryVisibilityWritingMode dynamicconfig.StringPropertyFn, +) *defaultManagerSelector { + return &defaultManagerSelector{ visibilityManager: visibilityManager, secondaryVisibilityManager: secondaryVisibilityManager, - enableReadFromSecondaryVisibility: enableReadFromSecondaryVisibility, - enableWriteToSecondaryVisibility: enableWriteToSecondaryVisibility, + enableReadFromSecondaryVisibility: enableSecondaryVisibilityRead, + secondaryVisibilityWritingMode: secondaryVisibilityWritingMode, } } -func (v *sqlToESManagerSelector) writeManagers() ([]manager.VisibilityManager, error) { - switch v.advancedVisibilityWritingMode() { +func (v *defaultManagerSelector) writeManagers() ([]manager.VisibilityManager, error) { + switch v.secondaryVisibilityWritingMode() { case SecondaryVisibilityWritingModeOff: - return []manager.VisibilityManager{v.stdVisibilityManager}, nil + return []manager.VisibilityManager{v.visibilityManager}, nil case SecondaryVisibilityWritingModeOn: - return []manager.VisibilityManager{v.advVisibilityManager}, nil + return []manager.VisibilityManager{v.secondaryVisibilityManager}, nil case SecondaryVisibilityWritingModeDual: - return []manager.VisibilityManager{v.stdVisibilityManager, v.advVisibilityManager}, nil + return []manager.VisibilityManager{v.visibilityManager, v.secondaryVisibilityManager}, nil default: - return nil, serviceerror.NewInternal(fmt.Sprintf("Unknown advanced visibility writing mode: %s", v.advancedVisibilityWritingMode())) + return nil, serviceerror.NewInternal(fmt.Sprintf( + "Unknown secondary visibility writing mode: %s", + v.secondaryVisibilityWritingMode(), + )) } } -func (v *sqlToESManagerSelector) readManager(namespace namespace.Name) manager.VisibilityManager { - if v.enableAdvancedVisibilityRead(namespace.String()) { - return v.advVisibilityManager - } - return v.stdVisibilityManager -} - -func (v *esManagerSelector) writeManagers() ([]manager.VisibilityManager, error) { - managers := []manager.VisibilityManager{v.visibilityManager} - if v.enableWriteToSecondaryVisibility() { - managers = append(managers, v.secondaryVisibilityManager) - } - - return managers, nil -} - -func (v *esManagerSelector) readManager(namespace namespace.Name) manager.VisibilityManager { +func (v *defaultManagerSelector) readManager(namespace namespace.Name) manager.VisibilityManager { if v.enableReadFromSecondaryVisibility(namespace.String()) { return v.secondaryVisibilityManager } diff --git a/common/persistence/visibility/store/elasticsearch/client/config.go b/common/persistence/visibility/store/elasticsearch/client/config.go index 07bc68368011..5880880726b1 100644 --- a/common/persistence/visibility/store/elasticsearch/client/config.go +++ b/common/persistence/visibility/store/elasticsearch/client/config.go @@ -25,6 +25,7 @@ package client import ( + "errors" "fmt" "net/url" "time" @@ -94,17 +95,15 @@ func (cfg *Config) GetSecondaryVisibilityIndex() string { return cfg.Indices[SecondaryVisibilityAppName] } -func (cfg *Config) Validate(storeName string) error { +func (cfg *Config) Validate() error { if cfg == nil { - return fmt.Errorf("persistence config: advanced visibility datastore %q: must provide config for \"elasticsearch\"", storeName) + return errors.New("elasticsearch config: config not found") } - if len(cfg.Indices) < 1 { - return fmt.Errorf("persistence config: advanced visibility datastore %q: missing indices", storeName) - + return errors.New("elasticsearch config: missing indices") } if cfg.Indices[VisibilityAppName] == "" { - return fmt.Errorf("persistence config: advanced visibility datastore %q indices configuration: missing %q key", storeName, VisibilityAppName) + return fmt.Errorf("elasticsearch config: indices configuration: missing %q key", VisibilityAppName) } return nil } diff --git a/service/frontend/dcRedirectionPolicy_test.go b/service/frontend/dcRedirectionPolicy_test.go index 5a91f20ae456..7d672b099d62 100644 --- a/service/frontend/dcRedirectionPolicy_test.go +++ b/service/frontend/dcRedirectionPolicy_test.go @@ -138,7 +138,7 @@ func (s *selectedAPIsForwardingRedirectionPolicySuite) SetupTest() { logger := log.NewTestLogger() - s.mockConfig = NewConfig(dynamicconfig.NewCollection(dynamicconfig.NewNoopClient(), logger), 0, false) + s.mockConfig = NewConfig(dynamicconfig.NewCollection(dynamicconfig.NewNoopClient(), logger), 0, false, false) s.mockClusterMetadata.EXPECT().IsGlobalNamespaceEnabled().Return(true).AnyTimes() s.policy = NewSelectedAPIsForwardingPolicy( s.currentClusterName, diff --git a/service/frontend/fx.go b/service/frontend/fx.go index 6dc04f2c9410..0d13e0cc9c15 100644 --- a/service/frontend/fx.go +++ b/service/frontend/fx.go @@ -220,6 +220,7 @@ func ConfigProvider( return NewConfig( dc, persistenceConfig.NumHistoryShards, + persistenceConfig.StandardVisibilityConfigExist(), persistenceConfig.AdvancedVisibilityConfigExist(), ) } @@ -390,7 +391,6 @@ func VisibilityManagerProvider( persistenceConfig *config.Persistence, metricsHandler metrics.Handler, serviceConfig *Config, - esConfig *esclient.Config, esClient esclient.Client, persistenceServiceResolver resolver.ServiceResolver, searchAttributesMapperProvider searchattribute.MapperProvider, @@ -399,20 +399,14 @@ func VisibilityManagerProvider( return visibility.NewManager( *persistenceConfig, persistenceServiceResolver, - esConfig.GetVisibilityIndex(), - esConfig.GetSecondaryVisibilityIndex(), esClient, nil, // frontend visibility never write saProvider, searchAttributesMapperProvider, - serviceConfig.StandardVisibilityPersistenceMaxReadQPS, - serviceConfig.StandardVisibilityPersistenceMaxWriteQPS, - serviceConfig.AdvancedVisibilityPersistenceMaxReadQPS, - serviceConfig.AdvancedVisibilityPersistenceMaxWriteQPS, - serviceConfig.EnableReadVisibilityFromES, + serviceConfig.VisibilityPersistenceMaxReadQPS, + serviceConfig.VisibilityPersistenceMaxWriteQPS, + serviceConfig.EnableReadFromSecondaryVisibility, dynamicconfig.GetStringPropertyFn(visibility.SecondaryVisibilityWritingModeOff), // frontend visibility never write - serviceConfig.EnableReadFromSecondaryAdvancedVisibility, - dynamicconfig.GetBoolPropertyFn(false), // frontend visibility never write serviceConfig.VisibilityDisableOrderByClause, metricsHandler, logger, diff --git a/service/frontend/redirection_interceptor_test.go b/service/frontend/redirection_interceptor_test.go index be013e31a102..d03a65795472 100644 --- a/service/frontend/redirection_interceptor_test.go +++ b/service/frontend/redirection_interceptor_test.go @@ -88,6 +88,7 @@ func (s *redirectionInterceptorSuite) SetupTest() { dynamicconfig.NewNoopCollection(), 1, false, + false, ), s.namespaceCache, config.DCRedirectionPolicy{ diff --git a/service/frontend/service.go b/service/frontend/service.go index 8c29f33e54d4..ad5a4af885ad 100644 --- a/service/frontend/service.go +++ b/service/frontend/service.go @@ -47,6 +47,7 @@ import ( "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence/client" + "go.temporal.io/server/common/persistence/visibility" "go.temporal.io/server/common/persistence/visibility/manager" "go.temporal.io/server/common/util" ) @@ -59,15 +60,12 @@ type Config struct { PersistenceNamespaceMaxQPS dynamicconfig.IntPropertyFnWithNamespaceFilter EnablePersistencePriorityRateLimiting dynamicconfig.BoolPropertyFn - StandardVisibilityPersistenceMaxReadQPS dynamicconfig.IntPropertyFn - StandardVisibilityPersistenceMaxWriteQPS dynamicconfig.IntPropertyFn - AdvancedVisibilityPersistenceMaxReadQPS dynamicconfig.IntPropertyFn - AdvancedVisibilityPersistenceMaxWriteQPS dynamicconfig.IntPropertyFn - VisibilityMaxPageSize dynamicconfig.IntPropertyFnWithNamespaceFilter - EnableReadVisibilityFromES dynamicconfig.BoolPropertyFnWithNamespaceFilter - EnableReadFromSecondaryAdvancedVisibility dynamicconfig.BoolPropertyFnWithNamespaceFilter - ESIndexMaxResultWindow dynamicconfig.IntPropertyFn - VisibilityDisableOrderByClause dynamicconfig.BoolPropertyFnWithNamespaceFilter + VisibilityPersistenceMaxReadQPS dynamicconfig.IntPropertyFn + VisibilityPersistenceMaxWriteQPS dynamicconfig.IntPropertyFn + VisibilityMaxPageSize dynamicconfig.IntPropertyFnWithNamespaceFilter + EnableReadFromSecondaryVisibility dynamicconfig.BoolPropertyFnWithNamespaceFilter + ESIndexMaxResultWindow dynamicconfig.IntPropertyFn + VisibilityDisableOrderByClause dynamicconfig.BoolPropertyFnWithNamespaceFilter HistoryMaxPageSize dynamicconfig.IntPropertyFnWithNamespaceFilter RPS dynamicconfig.IntPropertyFn @@ -170,7 +168,12 @@ type Config struct { } // NewConfig returns new service config with default values -func NewConfig(dc *dynamicconfig.Collection, numHistoryShards int32, enableReadFromES bool) *Config { +func NewConfig( + dc *dynamicconfig.Collection, + numHistoryShards int32, + visibilityStoreConfigExist bool, + enableReadFromES bool, +) *Config { return &Config{ NumHistoryShards: numHistoryShards, PersistenceMaxQPS: dc.GetIntProperty(dynamicconfig.FrontendPersistenceMaxQPS, 2000), @@ -178,15 +181,12 @@ func NewConfig(dc *dynamicconfig.Collection, numHistoryShards int32, enableReadF PersistenceNamespaceMaxQPS: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendPersistenceNamespaceMaxQPS, 0), EnablePersistencePriorityRateLimiting: dc.GetBoolProperty(dynamicconfig.FrontendEnablePersistencePriorityRateLimiting, true), - StandardVisibilityPersistenceMaxReadQPS: dc.GetIntProperty(dynamicconfig.StandardVisibilityPersistenceMaxReadQPS, 9000), - StandardVisibilityPersistenceMaxWriteQPS: dc.GetIntProperty(dynamicconfig.StandardVisibilityPersistenceMaxWriteQPS, 9000), - AdvancedVisibilityPersistenceMaxReadQPS: dc.GetIntProperty(dynamicconfig.AdvancedVisibilityPersistenceMaxReadQPS, 9000), - AdvancedVisibilityPersistenceMaxWriteQPS: dc.GetIntProperty(dynamicconfig.AdvancedVisibilityPersistenceMaxWriteQPS, 9000), - VisibilityMaxPageSize: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendVisibilityMaxPageSize, 1000), - EnableReadVisibilityFromES: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.EnableReadVisibilityFromES, enableReadFromES), - EnableReadFromSecondaryAdvancedVisibility: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.EnableReadFromSecondaryAdvancedVisibility, false), - ESIndexMaxResultWindow: dc.GetIntProperty(dynamicconfig.FrontendESIndexMaxResultWindow, 10000), - VisibilityDisableOrderByClause: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.VisibilityDisableOrderByClause, false), + VisibilityPersistenceMaxReadQPS: visibility.ResolveVisibilityPersistenceMaxReadQPS(dc, enableReadFromES), + VisibilityPersistenceMaxWriteQPS: visibility.ResolveVisibilityPersistenceMaxWriteQPS(dc, enableReadFromES), + VisibilityMaxPageSize: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendVisibilityMaxPageSize, 1000), + EnableReadFromSecondaryVisibility: visibility.ResolveEnableReadFromSecondaryVisibilityConfig(dc, visibilityStoreConfigExist, enableReadFromES), + ESIndexMaxResultWindow: dc.GetIntProperty(dynamicconfig.FrontendESIndexMaxResultWindow, 10000), + VisibilityDisableOrderByClause: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.VisibilityDisableOrderByClause, false), HistoryMaxPageSize: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendHistoryMaxPageSize, common.GetHistoryMaxPageSize), RPS: dc.GetIntProperty(dynamicconfig.FrontendRPS, 2400), diff --git a/service/frontend/workflow_handler.go b/service/frontend/workflow_handler.go index 9003402db167..a6d305f672d4 100644 --- a/service/frontend/workflow_handler.go +++ b/service/frontend/workflow_handler.go @@ -76,6 +76,7 @@ import ( "go.temporal.io/server/common/persistence/serialization" "go.temporal.io/server/common/persistence/visibility" "go.temporal.io/server/common/persistence/visibility/manager" + "go.temporal.io/server/common/persistence/visibility/store/elasticsearch" "go.temporal.io/server/common/primitives" "go.temporal.io/server/common/primitives/timestamp" "go.temporal.io/server/common/rpc/interceptor" @@ -4639,8 +4640,8 @@ func (wh *WorkflowHandler) getArchivedHistory( }, nil } -func (wh *WorkflowHandler) isListRequestPageSizeTooLarge(pageSize int32, namespace string) bool { - return wh.config.EnableReadVisibilityFromES(namespace) && +func (wh *WorkflowHandler) isListRequestPageSizeTooLarge(pageSize int32, nsName string) bool { + return wh.visibilityMrg.GetReadStoreName(namespace.Name(nsName)) == elasticsearch.PersistenceName && pageSize > int32(wh.config.ESIndexMaxResultWindow()) } diff --git a/service/frontend/workflow_handler_test.go b/service/frontend/workflow_handler_test.go index 910fb2711599..9f1f74805c30 100644 --- a/service/frontend/workflow_handler_test.go +++ b/service/frontend/workflow_handler_test.go @@ -204,6 +204,7 @@ func (s *workflowHandlerSuite) TestDisableListVisibilityByFilter() { wh := s.getWorkflowHandler(config) s.mockNamespaceCache.EXPECT().GetNamespaceID(gomock.Any()).Return(namespaceID, nil).AnyTimes() + s.mockVisibilityMgr.EXPECT().GetReadStoreName(testNamespace).Return("").AnyTimes() // test list open by wid listRequest := &workflowservice.ListOpenWorkflowExecutionsRequest{ @@ -1723,9 +1724,7 @@ func (s *workflowHandlerSuite) TestGetSearchAttributes() { } func (s *workflowHandlerSuite) TestDescribeWorkflowExecution_RunningStatus() { - config := s.newConfig() - config.EnableReadVisibilityFromES = dc.GetBoolPropertyFnFilteredByNamespace(true) - wh := s.getWorkflowHandler(config) + wh := s.getWorkflowHandler(s.newConfig()) now := timestamp.TimePtr(time.Now()) s.mockNamespaceCache.EXPECT().GetNamespaceID(gomock.Any()).Return( @@ -1761,9 +1760,7 @@ func (s *workflowHandlerSuite) TestDescribeWorkflowExecution_RunningStatus() { } func (s *workflowHandlerSuite) TestDescribeWorkflowExecution_CompletedStatus() { - config := s.newConfig() - config.EnableReadVisibilityFromES = dc.GetBoolPropertyFnFilteredByNamespace(true) - wh := s.getWorkflowHandler(config) + wh := s.getWorkflowHandler(s.newConfig()) now := timestamp.TimePtr(time.Now()) s.mockNamespaceCache.EXPECT().GetNamespaceID(gomock.Any()).Return( @@ -1800,11 +1797,10 @@ func (s *workflowHandlerSuite) TestDescribeWorkflowExecution_CompletedStatus() { func (s *workflowHandlerSuite) TestListWorkflowExecutions() { config := s.newConfig() - config.EnableReadVisibilityFromES = dc.GetBoolPropertyFnFilteredByNamespace(true) - wh := s.getWorkflowHandler(config) s.mockNamespaceCache.EXPECT().GetNamespaceID(gomock.Any()).Return(s.testNamespaceID, nil).AnyTimes() + s.mockVisibilityMgr.EXPECT().GetReadStoreName(s.testNamespace).Return(elasticsearch.PersistenceName).AnyTimes() s.mockVisibilityMgr.EXPECT().ListWorkflowExecutions(gomock.Any(), gomock.Any()).Return(&manager.ListWorkflowExecutionsResponse{}, nil) listRequest := &workflowservice.ListWorkflowExecutionsRequest{ @@ -1826,10 +1822,10 @@ func (s *workflowHandlerSuite) TestListWorkflowExecutions() { func (s *workflowHandlerSuite) TestScanWorkflowExecutions() { config := s.newConfig() - config.EnableReadVisibilityFromES = dc.GetBoolPropertyFnFilteredByNamespace(true) wh := s.getWorkflowHandler(config) s.mockNamespaceCache.EXPECT().GetNamespaceID(gomock.Any()).Return(s.testNamespaceID, nil).AnyTimes() + s.mockVisibilityMgr.EXPECT().GetReadStoreName(s.testNamespace).Return(elasticsearch.PersistenceName).AnyTimes() s.mockVisibilityMgr.EXPECT().ScanWorkflowExecutions(gomock.Any(), gomock.Any()).Return(&manager.ListWorkflowExecutionsResponse{}, nil) scanRequest := &workflowservice.ScanWorkflowExecutionsRequest{ @@ -1854,9 +1850,7 @@ func (s *workflowHandlerSuite) TestScanWorkflowExecutions() { } func (s *workflowHandlerSuite) TestCountWorkflowExecutions() { - config := s.newConfig() - config.EnableReadVisibilityFromES = dc.GetBoolPropertyFnFilteredByNamespace(true) - wh := s.getWorkflowHandler(config) + wh := s.getWorkflowHandler(s.newConfig()) s.mockNamespaceCache.EXPECT().GetNamespaceID(gomock.Any()).Return(s.testNamespaceID, nil).AnyTimes() s.mockVisibilityMgr.EXPECT().CountWorkflowExecutions(gomock.Any(), gomock.Any()).Return(&manager.CountWorkflowExecutionsResponse{Count: 5}, nil) @@ -1929,10 +1923,7 @@ func (s *workflowHandlerSuite) TestVerifyHistoryIsComplete() { } func (s *workflowHandlerSuite) TestGetSystemInfo() { - config := s.newConfig() - config.EnableReadVisibilityFromES = dc.GetBoolPropertyFnFilteredByNamespace(true) - - wh := s.getWorkflowHandler(config) + wh := s.getWorkflowHandler(s.newConfig()) resp, err := wh.GetSystemInfo(context.Background(), &workflowservice.GetSystemInfoRequest{}) s.NoError(err) @@ -2498,6 +2489,7 @@ func (s *workflowHandlerSuite) TestListBatchOperations() { wh := s.getWorkflowHandler(config) now := timestamp.TimePtr(time.Now()) s.mockNamespaceCache.EXPECT().GetNamespaceID(gomock.Any()).Return(namespaceID, nil).AnyTimes() + s.mockVisibilityMgr.EXPECT().GetReadStoreName(testNamespace).Return("").AnyTimes() s.mockVisibilityMgr.EXPECT().ListWorkflowExecutions(gomock.Any(), gomock.Any()).DoAndReturn( func( _ context.Context, @@ -2549,7 +2541,7 @@ func (s *workflowHandlerSuite) TestListBatchOperations_InvalidRerquest() { } func (s *workflowHandlerSuite) newConfig() *Config { - return NewConfig(dc.NewCollection(dc.NewNoopClient(), s.mockResource.GetLogger()), numHistoryShards, false) + return NewConfig(dc.NewCollection(dc.NewNoopClient(), s.mockResource.GetLogger()), numHistoryShards, false, false) } func updateRequest( diff --git a/service/history/configs/config.go b/service/history/configs/config.go index 198c542eda18..cb2896317c6f 100644 --- a/service/history/configs/config.go +++ b/service/history/configs/config.go @@ -47,15 +47,11 @@ type Config struct { PersistenceNamespaceMaxQPS dynamicconfig.IntPropertyFnWithNamespaceFilter EnablePersistencePriorityRateLimiting dynamicconfig.BoolPropertyFn - StandardVisibilityPersistenceMaxReadQPS dynamicconfig.IntPropertyFn - StandardVisibilityPersistenceMaxWriteQPS dynamicconfig.IntPropertyFn - AdvancedVisibilityPersistenceMaxReadQPS dynamicconfig.IntPropertyFn - AdvancedVisibilityPersistenceMaxWriteQPS dynamicconfig.IntPropertyFn - AdvancedVisibilityWritingMode dynamicconfig.StringPropertyFn - EnableWriteToSecondaryAdvancedVisibility dynamicconfig.BoolPropertyFn - EnableReadVisibilityFromES dynamicconfig.BoolPropertyFnWithNamespaceFilter - EnableReadFromSecondaryAdvancedVisibility dynamicconfig.BoolPropertyFnWithNamespaceFilter - VisibilityDisableOrderByClause dynamicconfig.BoolPropertyFnWithNamespaceFilter + VisibilityPersistenceMaxReadQPS dynamicconfig.IntPropertyFn + VisibilityPersistenceMaxWriteQPS dynamicconfig.IntPropertyFn + EnableReadFromSecondaryVisibility dynamicconfig.BoolPropertyFnWithNamespaceFilter + SecondaryVisibilityWritingMode dynamicconfig.StringPropertyFn + VisibilityDisableOrderByClause dynamicconfig.BoolPropertyFnWithNamespaceFilter EmitShardLagLog dynamicconfig.BoolPropertyFn MaxAutoResetPoints dynamicconfig.IntPropertyFnWithNamespaceFilter @@ -299,7 +295,13 @@ const ( ) // NewConfig returns new service config with default values -func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVisibilityConfigExist bool, defaultVisibilityIndex string) *Config { +func NewConfig( + dc *dynamicconfig.Collection, + numberOfShards int32, + visibilityStoreConfigExist bool, + advancedVisibilityStoreConfigExist bool, + defaultVisibilityIndex string, +) *Config { cfg := &Config{ NumberOfShards: numberOfShards, DefaultVisibilityIndexName: defaultVisibilityIndex, @@ -315,15 +317,11 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis DefaultWorkflowTaskTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.DefaultWorkflowTaskTimeout, common.DefaultWorkflowTaskTimeout), ContinueAsNewMinInterval: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.ContinueAsNewMinInterval, time.Second), - StandardVisibilityPersistenceMaxReadQPS: dc.GetIntProperty(dynamicconfig.StandardVisibilityPersistenceMaxReadQPS, 9000), - StandardVisibilityPersistenceMaxWriteQPS: dc.GetIntProperty(dynamicconfig.StandardVisibilityPersistenceMaxWriteQPS, 9000), - AdvancedVisibilityPersistenceMaxReadQPS: dc.GetIntProperty(dynamicconfig.AdvancedVisibilityPersistenceMaxReadQPS, 9000), - AdvancedVisibilityPersistenceMaxWriteQPS: dc.GetIntProperty(dynamicconfig.AdvancedVisibilityPersistenceMaxWriteQPS, 9000), - AdvancedVisibilityWritingMode: dc.GetStringProperty(dynamicconfig.AdvancedVisibilityWritingMode, visibility.DefaultAdvancedVisibilityWritingMode(isAdvancedVisibilityConfigExist)), - EnableWriteToSecondaryAdvancedVisibility: dc.GetBoolProperty(dynamicconfig.EnableWriteToSecondaryAdvancedVisibility, false), - EnableReadVisibilityFromES: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.EnableReadVisibilityFromES, isAdvancedVisibilityConfigExist), - EnableReadFromSecondaryAdvancedVisibility: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.EnableReadFromSecondaryAdvancedVisibility, false), - VisibilityDisableOrderByClause: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.VisibilityDisableOrderByClause, false), + VisibilityPersistenceMaxReadQPS: visibility.ResolveVisibilityPersistenceMaxReadQPS(dc, advancedVisibilityStoreConfigExist), + VisibilityPersistenceMaxWriteQPS: visibility.ResolveVisibilityPersistenceMaxWriteQPS(dc, advancedVisibilityStoreConfigExist), + EnableReadFromSecondaryVisibility: visibility.ResolveEnableReadFromSecondaryVisibilityConfig(dc, visibilityStoreConfigExist, advancedVisibilityStoreConfigExist), + SecondaryVisibilityWritingMode: visibility.ResolveSecondaryVisibilityWritingModeConfig(dc, visibilityStoreConfigExist, advancedVisibilityStoreConfigExist), + VisibilityDisableOrderByClause: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.VisibilityDisableOrderByClause, false), EmitShardLagLog: dc.GetBoolProperty(dynamicconfig.EmitShardLagLog, false), HistoryCacheInitialSize: dc.GetIntProperty(dynamicconfig.HistoryCacheInitialSize, 128), diff --git a/service/history/fx.go b/service/history/fx.go index 30dc30175c0b..e60358b44e9f 100644 --- a/service/history/fx.go +++ b/service/history/fx.go @@ -177,8 +177,10 @@ func ConfigProvider( } else if persistenceConfig.AdvancedVisibilityConfigExist() { indexName = esConfig.GetVisibilityIndex() } - return configs.NewConfig(dc, + return configs.NewConfig( + dc, persistenceConfig.NumHistoryShards, + persistenceConfig.StandardVisibilityConfigExist(), persistenceConfig.AdvancedVisibilityConfigExist(), indexName, ) @@ -246,7 +248,6 @@ func VisibilityManagerProvider( persistenceConfig *config.Persistence, esProcessorConfig *elasticsearch.ProcessorConfig, serviceConfig *configs.Config, - esConfig *esclient.Config, esClient esclient.Client, persistenceServiceResolver resolver.ServiceResolver, searchAttributesMapperProvider searchattribute.MapperProvider, @@ -255,20 +256,14 @@ func VisibilityManagerProvider( return visibility.NewManager( *persistenceConfig, persistenceServiceResolver, - esConfig.GetVisibilityIndex(), - esConfig.GetSecondaryVisibilityIndex(), esClient, esProcessorConfig, saProvider, searchAttributesMapperProvider, - serviceConfig.StandardVisibilityPersistenceMaxReadQPS, - serviceConfig.StandardVisibilityPersistenceMaxWriteQPS, - serviceConfig.AdvancedVisibilityPersistenceMaxReadQPS, - serviceConfig.AdvancedVisibilityPersistenceMaxWriteQPS, - serviceConfig.EnableReadVisibilityFromES, - serviceConfig.AdvancedVisibilityWritingMode, - serviceConfig.EnableReadFromSecondaryAdvancedVisibility, - serviceConfig.EnableWriteToSecondaryAdvancedVisibility, + serviceConfig.VisibilityPersistenceMaxReadQPS, + serviceConfig.VisibilityPersistenceMaxWriteQPS, + serviceConfig.EnableReadFromSecondaryVisibility, + serviceConfig.SecondaryVisibilityWritingMode, serviceConfig.VisibilityDisableOrderByClause, metricsHandler, logger, diff --git a/service/history/queue_factory_base_test.go b/service/history/queue_factory_base_test.go index 5e74ca976876..c82daeb2490f 100644 --- a/service/history/queue_factory_base_test.go +++ b/service/history/queue_factory_base_test.go @@ -150,6 +150,7 @@ func getModuleDependencies(controller *gomock.Controller, c *moduleTestCase) fx. dynamicconfig.NewNoopCollection(), 1, false, + false, "", ) archivalMetadata := getArchivalMetadata(controller, c) diff --git a/service/history/tests/vars.go b/service/history/tests/vars.go index 2ba316227d3f..b9d37e9c2f9d 100644 --- a/service/history/tests/vars.go +++ b/service/history/tests/vars.go @@ -179,7 +179,7 @@ var ( func NewDynamicConfig() *configs.Config { dc := dynamicconfig.NewNoopCollection() - config := configs.NewConfig(dc, 1, false, "") + config := configs.NewConfig(dc, 1, false, false, "") // reduce the duration of long poll to increase test speed config.LongPollExpirationInterval = dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.HistoryLongPollExpirationInterval, 10*time.Second) config.EnableActivityEagerExecution = dynamicconfig.GetBoolPropertyFnFilteredByNamespace(true) diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index fb6a39307bcc..e644652404de 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -62,7 +62,6 @@ import ( "go.temporal.io/server/common/payload" "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/persistence/versionhistory" - "go.temporal.io/server/common/persistence/visibility" "go.temporal.io/server/common/primitives/timestamp" "go.temporal.io/server/common/searchattribute" "go.temporal.io/server/service/history/configs" @@ -1879,10 +1878,7 @@ func (ms *MutableStateImpl) addBinaryCheckSumIfNotExists( exeInfo.SearchAttributes = make(map[string]*commonpb.Payload, 1) } exeInfo.SearchAttributes[searchattribute.BinaryChecksums] = checksumsPayload - if ms.shard.GetConfig().AdvancedVisibilityWritingMode() != visibility.SecondaryVisibilityWritingModeOff { - return ms.taskGenerator.GenerateUpsertVisibilityTask() - } - return nil + return ms.taskGenerator.GenerateUpsertVisibilityTask() } // TODO: we will release the restriction when reset API allow those pending diff --git a/service/history/workflow/task_refresher.go b/service/history/workflow/task_refresher.go index 884bc70d413c..b8b201d7244d 100644 --- a/service/history/workflow/task_refresher.go +++ b/service/history/workflow/task_refresher.go @@ -36,7 +36,6 @@ import ( "go.temporal.io/server/common" "go.temporal.io/server/common/log" "go.temporal.io/server/common/namespace" - "go.temporal.io/server/common/persistence/visibility" "go.temporal.io/server/common/primitives/timestamp" "go.temporal.io/server/service/history/configs" "go.temporal.io/server/service/history/events" @@ -153,12 +152,10 @@ func (r *TaskRefresherImpl) RefreshTasks( return err } - if r.config.AdvancedVisibilityWritingMode() != visibility.SecondaryVisibilityWritingModeOff { - if err := r.refreshTasksForWorkflowSearchAttr( - taskGenerator, - ); err != nil { - return err - } + if err := r.refreshTasksForWorkflowSearchAttr( + taskGenerator, + ); err != nil { + return err } return nil diff --git a/service/worker/fx.go b/service/worker/fx.go index a1557b5755d2..649a182d20c2 100644 --- a/service/worker/fx.go +++ b/service/worker/fx.go @@ -88,6 +88,7 @@ func ConfigProvider( return NewConfig( dc, persistenceConfig, + persistenceConfig.StandardVisibilityConfigExist(), persistenceConfig.AdvancedVisibilityConfigExist(), ) } @@ -97,7 +98,6 @@ func VisibilityManagerProvider( metricsHandler metrics.Handler, persistenceConfig *config.Persistence, serviceConfig *Config, - esConfig *esclient.Config, esClient esclient.Client, persistenceServiceResolver resolver.ServiceResolver, searchAttributesMapperProvider searchattribute.MapperProvider, @@ -106,20 +106,14 @@ func VisibilityManagerProvider( return visibility.NewManager( *persistenceConfig, persistenceServiceResolver, - esConfig.GetVisibilityIndex(), - esConfig.GetSecondaryVisibilityIndex(), esClient, nil, // worker visibility never write saProvider, searchAttributesMapperProvider, - serviceConfig.StandardVisibilityPersistenceMaxReadQPS, - serviceConfig.StandardVisibilityPersistenceMaxWriteQPS, - serviceConfig.AdvancedVisibilityPersistenceMaxReadQPS, - serviceConfig.AdvancedVisibilityPersistenceMaxWriteQPS, - serviceConfig.EnableReadVisibilityFromES, + serviceConfig.VisibilityPersistenceMaxReadQPS, + serviceConfig.VisibilityPersistenceMaxWriteQPS, + serviceConfig.EnableReadFromSecondaryVisibility, dynamicconfig.GetStringPropertyFn(visibility.SecondaryVisibilityWritingModeOff), // worker visibility never write - serviceConfig.EnableReadFromSecondaryAdvancedVisibility, - dynamicconfig.GetBoolPropertyFn(false), // worker visibility never write serviceConfig.VisibilityDisableOrderByClause, metricsHandler, logger, diff --git a/service/worker/service.go b/service/worker/service.go index 3d704a4640c2..fcb2d2f79a08 100644 --- a/service/worker/service.go +++ b/service/worker/service.go @@ -47,6 +47,7 @@ import ( "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence" persistenceClient "go.temporal.io/server/common/persistence/client" + "go.temporal.io/server/common/persistence/visibility" "go.temporal.io/server/common/persistence/visibility/manager" esclient "go.temporal.io/server/common/persistence/visibility/store/elasticsearch/client" "go.temporal.io/server/common/primitives" @@ -115,13 +116,10 @@ type ( PerNamespaceWorkerCount dynamicconfig.IntPropertyFnWithNamespaceFilter PerNamespaceWorkerOptions dynamicconfig.MapPropertyFnWithNamespaceFilter - StandardVisibilityPersistenceMaxReadQPS dynamicconfig.IntPropertyFn - StandardVisibilityPersistenceMaxWriteQPS dynamicconfig.IntPropertyFn - AdvancedVisibilityPersistenceMaxReadQPS dynamicconfig.IntPropertyFn - AdvancedVisibilityPersistenceMaxWriteQPS dynamicconfig.IntPropertyFn - EnableReadVisibilityFromES dynamicconfig.BoolPropertyFnWithNamespaceFilter - EnableReadFromSecondaryAdvancedVisibility dynamicconfig.BoolPropertyFnWithNamespaceFilter - VisibilityDisableOrderByClause dynamicconfig.BoolPropertyFnWithNamespaceFilter + VisibilityPersistenceMaxReadQPS dynamicconfig.IntPropertyFn + VisibilityPersistenceMaxWriteQPS dynamicconfig.IntPropertyFn + EnableReadFromSecondaryVisibility dynamicconfig.BoolPropertyFnWithNamespaceFilter + VisibilityDisableOrderByClause dynamicconfig.BoolPropertyFnWithNamespaceFilter } ) @@ -187,7 +185,12 @@ func NewService( } // NewConfig builds the new Config for worker service -func NewConfig(dc *dynamicconfig.Collection, persistenceConfig *config.Persistence, enableReadFromES bool) *Config { +func NewConfig( + dc *dynamicconfig.Collection, + persistenceConfig *config.Persistence, + visibilityStoreConfigExist bool, + enableReadFromES bool, +) *Config { config := &Config{ ArchiverConfig: &archiver.Config{ MaxConcurrentActivityExecutionSize: dc.GetIntProperty( @@ -339,13 +342,10 @@ func NewConfig(dc *dynamicconfig.Collection, persistenceConfig *config.Persisten true, ), - StandardVisibilityPersistenceMaxReadQPS: dc.GetIntProperty(dynamicconfig.StandardVisibilityPersistenceMaxReadQPS, 9000), - StandardVisibilityPersistenceMaxWriteQPS: dc.GetIntProperty(dynamicconfig.StandardVisibilityPersistenceMaxWriteQPS, 9000), - AdvancedVisibilityPersistenceMaxReadQPS: dc.GetIntProperty(dynamicconfig.AdvancedVisibilityPersistenceMaxReadQPS, 9000), - AdvancedVisibilityPersistenceMaxWriteQPS: dc.GetIntProperty(dynamicconfig.AdvancedVisibilityPersistenceMaxWriteQPS, 9000), - EnableReadVisibilityFromES: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.EnableReadVisibilityFromES, enableReadFromES), - EnableReadFromSecondaryAdvancedVisibility: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.EnableReadFromSecondaryAdvancedVisibility, false), - VisibilityDisableOrderByClause: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.VisibilityDisableOrderByClause, false), + VisibilityPersistenceMaxReadQPS: visibility.ResolveVisibilityPersistenceMaxReadQPS(dc, enableReadFromES), + VisibilityPersistenceMaxWriteQPS: visibility.ResolveVisibilityPersistenceMaxWriteQPS(dc, enableReadFromES), + EnableReadFromSecondaryVisibility: visibility.ResolveEnableReadFromSecondaryVisibilityConfig(dc, visibilityStoreConfigExist, enableReadFromES), + VisibilityDisableOrderByClause: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.VisibilityDisableOrderByClause, false), } return config } diff --git a/temporal/fx.go b/temporal/fx.go index a94297316961..0645f4f73456 100644 --- a/temporal/fx.go +++ b/temporal/fx.go @@ -164,7 +164,8 @@ func ServerOptionsProvider(opts []ServerOption) (serverOptionsProvider, error) { return serverOptionsProvider{}, err } - err = verifyPersistenceCompatibleVersion(so.config.Persistence, so.persistenceServiceResolver) + persistenceConfig := so.config.Persistence + err = verifyPersistenceCompatibleVersion(persistenceConfig, so.persistenceServiceResolver) if err != nil { return serverOptionsProvider{}, err } @@ -222,23 +223,27 @@ func ServerOptionsProvider(opts []ServerOption) (serverOptionsProvider, error) { // EsConfig / EsClient var esConfig *esclient.Config var esClient esclient.Client - if so.config.Persistence.AdvancedVisibilityConfigExist() { - advancedVisibilityStore, ok := so.config.Persistence.DataStores[so.config.Persistence.AdvancedVisibilityStore] - if !ok { - return serverOptionsProvider{}, fmt.Errorf("persistence config: advanced visibility datastore %q: missing config", so.config.Persistence.AdvancedVisibilityStore) - } + if persistenceConfig.StandardVisibilityConfigExist() && + persistenceConfig.DataStores[persistenceConfig.VisibilityStore].Elasticsearch != nil { + esConfig = persistenceConfig.DataStores[persistenceConfig.VisibilityStore].Elasticsearch + } else if persistenceConfig.SecondaryVisibilityConfigExist() && + persistenceConfig.DataStores[persistenceConfig.SecondaryVisibilityStore].Elasticsearch != nil { + esConfig = persistenceConfig.DataStores[persistenceConfig.SecondaryVisibilityStore].Elasticsearch + } else if persistenceConfig.AdvancedVisibilityConfigExist() { + esConfig = persistenceConfig.DataStores[persistenceConfig.AdvancedVisibilityStore].Elasticsearch + } + + if esConfig != nil { esHttpClient := so.elasticsearchHttpClient if esHttpClient == nil { var err error - esHttpClient, err = esclient.NewAwsHttpClient(advancedVisibilityStore.Elasticsearch.AWSRequestSigning) + esHttpClient, err = esclient.NewAwsHttpClient(esConfig.AWSRequestSigning) if err != nil { return serverOptionsProvider{}, fmt.Errorf("unable to create AWS HTTP client for Elasticsearch: %w", err) } } - esConfig = advancedVisibilityStore.Elasticsearch - esClient, err = esclient.NewClient(esConfig, esHttpClient, logger) if err != nil { return serverOptionsProvider{}, fmt.Errorf("unable to create Elasticsearch client: %w", err) @@ -644,13 +649,17 @@ func ApplyClusterMetadataConfigProvider( } defer clusterMetadataManager.Close() - var indexName string - var initialIndexSearchAttributes map[string]*persistencespb.IndexSearchAttributes - if config.Persistence.IsSQLVisibilityStore() { - indexName = config.Persistence.DataStores[config.Persistence.VisibilityStore].SQL.DatabaseName - initialIndexSearchAttributes = map[string]*persistencespb.IndexSearchAttributes{ - indexName: searchattribute.GetSqlDbIndexSearchAttributes(), - } + var sqlIndexNames []string + initialIndexSearchAttributes := make(map[string]*persistencespb.IndexSearchAttributes) + if ds := config.Persistence.GetVisibilityStoreConfig(); ds.SQL != nil { + indexName := ds.GetIndexName() + sqlIndexNames = append(sqlIndexNames, indexName) + initialIndexSearchAttributes[indexName] = searchattribute.GetSqlDbIndexSearchAttributes() + } + if ds := config.Persistence.GetSecondaryVisibilityStoreConfig(); ds.SQL != nil { + indexName := ds.GetIndexName() + sqlIndexNames = append(sqlIndexNames, indexName) + initialIndexSearchAttributes[indexName] = searchattribute.GetSqlDbIndexSearchAttributes() } clusterData := config.ClusterMetadata @@ -712,14 +721,18 @@ func ApplyClusterMetadataConfigProvider( // TODO (rodrigozhou): Remove this block for v1.21. // Handle registering custom search attributes when upgrading to v1.20. - if config.Persistence.IsSQLVisibilityStore() { + if len(sqlIndexNames) > 0 { needSave := false if currentMetadata.IndexSearchAttributes == nil { currentMetadata.IndexSearchAttributes = initialIndexSearchAttributes needSave = true - } else if _, ok := currentMetadata.IndexSearchAttributes[indexName]; !ok { - currentMetadata.IndexSearchAttributes[indexName] = searchattribute.GetSqlDbIndexSearchAttributes() - needSave = true + } else { + for _, indexName := range sqlIndexNames { + if _, ok := currentMetadata.IndexSearchAttributes[indexName]; !ok { + currentMetadata.IndexSearchAttributes[indexName] = searchattribute.GetSqlDbIndexSearchAttributes() + needSave = true + } + } } if needSave {