Skip to content

Commit

Permalink
Refactor configs for visibility (#4095)
Browse files Browse the repository at this point in the history
New dynamic configs for visibility
  • Loading branch information
rodrigozhou authored Apr 24, 2023
1 parent b97be78 commit 12655c4
Show file tree
Hide file tree
Showing 24 changed files with 522 additions and 383 deletions.
3 changes: 3 additions & 0 deletions common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,9 @@ type (
DefaultStore string `yaml:"defaultStore" validate:"nonzero"`
// VisibilityStore is the name of the datastore to be used for visibility records
VisibilityStore string `yaml:"visibilityStore"`
// SecondaryVisibilityStore is the name of the secondary datastore to be used for visibility records
SecondaryVisibilityStore string `yaml:"secondaryVisibilityStore"`
// DEPRECATED: use VisibilityStore key instead of AdvancedVisibilityStore
// AdvancedVisibilityStore is the name of the datastore to be used for visibility records
AdvancedVisibilityStore string `yaml:"advancedVisibilityStore"`
// NumHistoryShards is the desired number of history shards. This config doesn't
Expand Down
156 changes: 139 additions & 17 deletions common/config/persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"strings"

"github.com/gocql/gocql"
"go.temporal.io/server/common/persistence/visibility/store/elasticsearch/client"
)

const (
Expand All @@ -54,6 +55,68 @@ func (c *Persistence) Validate() error {
if c.VisibilityStore != "" {
stores = append(stores, c.VisibilityStore)
}
if c.SecondaryVisibilityStore != "" {
stores = append(stores, c.SecondaryVisibilityStore)
}
if c.AdvancedVisibilityStore != "" {
stores = append(stores, c.AdvancedVisibilityStore)
}

// There are 3 config keys:
// - visibilityStore: can set any data store
// - secondaryVisibilityStore: can set any data store
// - advancedVisibilityStore: can only set elasticsearch data store
// If visibilityStore is set, then it's always the primary.
// If secondaryVisibilityStore is set, it's always the secondary.
//
// Valid dual visibility combinations (order: primary, secondary):
// - visibilityStore (standard), secondaryVisibilityStore (any)
// - visibilityStore (standard), advancedVisibilityStore (es)
// - visibilityStore (advanced sql), secondaryVisibilityStore (advanced sql)
// - visibilityStore (es), visibilityStore (es) [via elasticsearch.indices config]
// - advancedVisibilityStore (es), advancedVisibilityStore (es) [via elasticsearch.indices config]
//
// Invalid dual visibility combinations:
// - visibilityStore (advanced sql), secondaryVisibilityStore (standard, es)
// - visibilityStore (advanced sql), advancedVisibilityStore (es)
// - visibilityStore (es), secondaryVisibilityStore (any)
// - visibilityStore (es), advancedVisibilityStore (es)
// - advancedVisibilityStore (es), secondaryVisibilityStore (any)
//
// The validation for dual visibility pair (advanced sql, advanced sql) is in visibility factory
// due to circular dependency. This will be better after standard visibility is removed.

if c.VisibilityStore == "" && c.AdvancedVisibilityStore == "" {
return errors.New("persistence config: visibilityStore must be specified")
}
if c.SecondaryVisibilityStore != "" && c.AdvancedVisibilityStore != "" {
return errors.New(
"persistence config: cannot specify both secondaryVisibilityStore and " +
"advancedVisibilityStore",
)
}
if c.AdvancedVisibilityStore != "" && c.DataStores[c.AdvancedVisibilityStore].Elasticsearch == nil {
return fmt.Errorf(
"persistence config: advanced visibility datastore %q: missing elasticsearch config",
c.AdvancedVisibilityStore,
)
}
if c.DataStores[c.VisibilityStore].Elasticsearch != nil &&
(c.SecondaryVisibilityStore != "" || c.AdvancedVisibilityStore != "") {
return errors.New(
"persistence config: cannot set secondaryVisibilityStore or advancedVisibilityStore " +
"when visibilityStore is setting elasticsearch datastore",
)
}
if c.DataStores[c.SecondaryVisibilityStore].Elasticsearch.GetSecondaryVisibilityIndex() != "" {
return fmt.Errorf(
"persistence config: secondary visibility datastore %q: elasticsearch config: "+
"cannot set secondary_visibility",
c.SecondaryVisibilityStore,
)
}

cntEsConfigs := 0
for _, st := range stores {
ds, ok := c.DataStores[st]
if !ok {
Expand All @@ -62,10 +125,17 @@ func (c *Persistence) Validate() error {
if err := ds.Validate(); err != nil {
return fmt.Errorf("persistence config: datastore %q: %s", st, err.Error())
}
if ds.Elasticsearch != nil {
cntEsConfigs++
}
}

if err := c.validateAdvancedVisibility(); err != nil {
return err
if cntEsConfigs > 1 {
return fmt.Errorf(
"persistence config: cannot have more than one Elasticsearch visibility store config " +
"(use `elasticsearch.indices.secondary_visibility` config key if you need to set a " +
"secondary Elasticsearch visibility store)",
)
}

return nil
Expand All @@ -76,34 +146,75 @@ func (c *Persistence) StandardVisibilityConfigExist() bool {
return c.VisibilityStore != ""
}

// SecondaryVisibilityConfigExist returns whether user specified secondaryVisibilityStore in config
func (c *Persistence) SecondaryVisibilityConfigExist() bool {
return c.SecondaryVisibilityStore != ""
}

// AdvancedVisibilityConfigExist returns whether user specified advancedVisibilityStore in config
func (c *Persistence) AdvancedVisibilityConfigExist() bool {
return c.AdvancedVisibilityStore != ""
}

func (c *Persistence) IsSQLVisibilityStore() bool {
return c.StandardVisibilityConfigExist() && c.DataStores[c.VisibilityStore].SQL != nil
return (c.StandardVisibilityConfigExist() && c.DataStores[c.VisibilityStore].SQL != nil) ||
(c.SecondaryVisibilityConfigExist() && c.DataStores[c.SecondaryVisibilityStore].SQL != nil)
}

func (c *Persistence) validateAdvancedVisibility() error {
if !c.StandardVisibilityConfigExist() && !c.AdvancedVisibilityConfigExist() {
return errors.New("persistence config: one of visibilityStore or advancedVisibilityStore must be specified")
func (c *Persistence) GetVisibilityStoreConfig() DataStore {
if c.VisibilityStore != "" {
return c.DataStores[c.VisibilityStore]
}

if !c.AdvancedVisibilityConfigExist() {
return nil
if c.AdvancedVisibilityStore != "" {
return c.DataStores[c.AdvancedVisibilityStore]
}
// Based on validation above, this should never happen.
return DataStore{}
}

advancedVisibilityDataStore, ok := c.DataStores[c.AdvancedVisibilityStore]
if !ok {
return fmt.Errorf("persistence config: advanced visibility datastore %q: missing config", c.AdvancedVisibilityStore)
func (c *Persistence) GetSecondaryVisibilityStoreConfig() DataStore {
if c.SecondaryVisibilityStore != "" {
return c.DataStores[c.SecondaryVisibilityStore]
}

if err := advancedVisibilityDataStore.Elasticsearch.Validate(c.AdvancedVisibilityStore); err != nil {
return err
if c.VisibilityStore != "" {
if c.AdvancedVisibilityStore != "" {
return c.DataStores[c.AdvancedVisibilityStore]
}
ds := c.DataStores[c.VisibilityStore]
if ds.Elasticsearch != nil && ds.Elasticsearch.GetSecondaryVisibilityIndex() != "" {
esConfig := *ds.Elasticsearch
esConfig.Indices = map[string]string{
client.VisibilityAppName: ds.Elasticsearch.GetSecondaryVisibilityIndex(),
}
ds.Elasticsearch = &esConfig
return ds
}
}
if c.AdvancedVisibilityStore != "" {
ds := c.DataStores[c.AdvancedVisibilityStore]
if ds.Elasticsearch != nil && ds.Elasticsearch.GetSecondaryVisibilityIndex() != "" {
esConfig := *ds.Elasticsearch
esConfig.Indices = map[string]string{
client.VisibilityAppName: ds.Elasticsearch.GetSecondaryVisibilityIndex(),
}
ds.Elasticsearch = &esConfig
return ds
}
}
return DataStore{}
}

return nil
func (ds *DataStore) GetIndexName() string {
switch {
case ds.SQL != nil:
return ds.SQL.DatabaseName
case ds.Cassandra != nil:
return ds.Cassandra.Keyspace
case ds.Elasticsearch != nil:
return ds.Elasticsearch.GetVisibilityIndex()
default:
return ""
}
}

// Validate validates the data store config
Expand All @@ -118,8 +229,14 @@ func (ds *DataStore) Validate() error {
if ds.CustomDataStoreConfig != nil {
storeConfigCount++
}
if ds.Elasticsearch != nil {
storeConfigCount++
}
if storeConfigCount != 1 {
return errors.New("must provide config for one and only one for DataStore of cassandra or sql or custom stores")
return errors.New(
"must provide config for one and only one datastore: " +
"elasticsearch, cassandra, sql or custom store",
)
}

if ds.SQL != nil && ds.SQL.TaskScanPartitions == 0 {
Expand All @@ -130,6 +247,11 @@ func (ds *DataStore) Validate() error {
return err
}
}
if ds.Elasticsearch != nil {
if err := ds.Elasticsearch.Validate(); err != nil {
return err
}
}
return nil
}

Expand Down
5 changes: 5 additions & 0 deletions common/dynamicconfig/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,11 @@ func (c *Collection) GetTaskQueuePartitionsProperty(key Key) IntPropertyFnWithTa
return c.GetIntPropertyFilteredByTaskQueueInfo(key, defaultNumTaskQueuePartitions)
}

func (c *Collection) HasKey(key Key) bool {
cvs := c.client.GetValue(key)
return len(cvs) > 0
}

func findMatch(cvs, defaultCVs []ConstrainedValue, precedence []Constraints) (any, error) {
if len(cvs)+len(defaultCVs) == 0 {
return nil, errKeyNotPresent
Expand Down
10 changes: 10 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ const (

// keys for system

// DEPRECATED: the following block of configs are deprecated and replaced by the next block of configs
// StandardVisibilityPersistenceMaxReadQPS is the max QPC system host can query standard visibility DB (SQL or Cassandra) for read.
StandardVisibilityPersistenceMaxReadQPS = "system.standardVisibilityPersistenceMaxReadQPS"
// StandardVisibilityPersistenceMaxWriteQPS is the max QPC system host can query standard visibility DB (SQL or Cassandra) for write.
Expand All @@ -54,6 +55,15 @@ const (
EnableReadVisibilityFromES = "system.enableReadVisibilityFromES"
// EnableReadFromSecondaryAdvancedVisibility is the config to enable read from secondary Elasticsearch
EnableReadFromSecondaryAdvancedVisibility = "system.enableReadFromSecondaryAdvancedVisibility"

// VisibilityPersistenceMaxReadQPS is the max QPC system host can query visibility DB for read.
VisibilityPersistenceMaxReadQPS = "system.visibilityPersistenceMaxReadQPS"
// VisibilityPersistenceMaxWriteQPS is the max QPC system host can query visibility DB for write.
VisibilityPersistenceMaxWriteQPS = "system.visibilityPersistenceMaxWriteQPS"
// EnableReadFromSecondaryVisibility is the config to enable read from secondary visibility
EnableReadFromSecondaryVisibility = "system.enableReadFromSecondaryVisibility"
// SecondaryVisibilityWritingMode is key for how to write to secondary visibility
SecondaryVisibilityWritingMode = "system.secondaryVisibilityWritingMode"
// VisibilityDisableOrderByClause is the config to disable ORDERY BY clause for Elasticsearch
VisibilityDisableOrderByClause = "system.visibilityDisableOrderByClause"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,18 @@ func (s *VisibilityPersistenceSuite) SetupSuite() {
s.controller = gomock.NewController(s.T())
s.SearchAttributesProvider = searchattribute.NewTestProvider()
s.SearchAttributesMapperProvider = searchattribute.NewTestMapperProvider(nil)
s.VisibilityMgr, err = visibility.NewStandardManager(
s.VisibilityMgr, err = visibility.NewManager(
cfg,
resolver.NewNoopResolver(),
nil,
nil,
s.SearchAttributesProvider,
s.SearchAttributesMapperProvider,
dynamicconfig.GetIntPropertyFn(1000),
dynamicconfig.GetIntPropertyFn(1000),
dynamicconfig.GetBoolPropertyFnFilteredByNamespace(false),
dynamicconfig.GetStringPropertyFn(visibility.SecondaryVisibilityWritingModeOff),
dynamicconfig.GetBoolPropertyFnFilteredByNamespace(false),
metrics.NoopMetricsHandler,
s.Logger,
)
Expand Down
86 changes: 86 additions & 0 deletions common/persistence/visibility/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package visibility

import (
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/persistence/sql/sqlplugin/mysql"
"go.temporal.io/server/common/persistence/sql/sqlplugin/postgresql"
"go.temporal.io/server/common/persistence/sql/sqlplugin/sqlite"
Expand All @@ -47,6 +48,91 @@ func DefaultAdvancedVisibilityWritingMode(advancedVisibilityConfigExist bool) st
return SecondaryVisibilityWritingModeOff
}

//nolint:revive
func GetVisibilityPersistenceMaxReadQPS(
dc *dynamicconfig.Collection,
advancedVisibilityStoreConfigExists bool,
) dynamicconfig.IntPropertyFn {
if dc.HasKey(dynamicconfig.VisibilityPersistenceMaxReadQPS) {
return dc.GetIntProperty(dynamicconfig.VisibilityPersistenceMaxReadQPS, 9000)
}
if advancedVisibilityStoreConfigExists {
return dc.GetIntProperty(dynamicconfig.AdvancedVisibilityPersistenceMaxReadQPS, 9000)
}
return dc.GetIntProperty(dynamicconfig.StandardVisibilityPersistenceMaxReadQPS, 9000)
}

//nolint:revive
func GetVisibilityPersistenceMaxWriteQPS(
dc *dynamicconfig.Collection,
advancedVisibilityStoreConfigExists bool,
) dynamicconfig.IntPropertyFn {
if dc.HasKey(dynamicconfig.VisibilityPersistenceMaxWriteQPS) {
return dc.GetIntProperty(dynamicconfig.VisibilityPersistenceMaxWriteQPS, 9000)
}
if advancedVisibilityStoreConfigExists {
return dc.GetIntProperty(dynamicconfig.AdvancedVisibilityPersistenceMaxWriteQPS, 9000)
}
return dc.GetIntProperty(dynamicconfig.StandardVisibilityPersistenceMaxWriteQPS, 9000)
}

//nolint:revive
func GetEnableReadFromSecondaryVisibilityConfig(
dc *dynamicconfig.Collection,
visibilityStoreConfigExists bool,
advancedVisibilityStoreConfigExists bool,
) dynamicconfig.BoolPropertyFnWithNamespaceFilter {
if dc.HasKey(dynamicconfig.EnableReadFromSecondaryVisibility) {
return dc.GetBoolPropertyFnWithNamespaceFilter(
dynamicconfig.EnableReadFromSecondaryVisibility,
false,
)
}
if visibilityStoreConfigExists && advancedVisibilityStoreConfigExists {
return dc.GetBoolPropertyFnWithNamespaceFilter(
dynamicconfig.EnableReadVisibilityFromES,
advancedVisibilityStoreConfigExists,
)
}
if advancedVisibilityStoreConfigExists {
return dc.GetBoolPropertyFnWithNamespaceFilter(
dynamicconfig.EnableReadFromSecondaryAdvancedVisibility,
false,
)
}
return dynamicconfig.GetBoolPropertyFnFilteredByNamespace(false)
}

//nolint:revive
func GetSecondaryVisibilityWritingModeConfig(
dc *dynamicconfig.Collection,
visibilityStoreConfigExists bool,
advancedVisibilityStoreConfigExists bool,
) dynamicconfig.StringPropertyFn {
if dc.HasKey(dynamicconfig.SecondaryVisibilityWritingMode) {
return dc.GetStringProperty(
dynamicconfig.SecondaryVisibilityWritingMode,
SecondaryVisibilityWritingModeOff,
)
}
if visibilityStoreConfigExists && advancedVisibilityStoreConfigExists {
return dc.GetStringProperty(
dynamicconfig.AdvancedVisibilityWritingMode,
DefaultAdvancedVisibilityWritingMode(advancedVisibilityStoreConfigExists),
)
}
if advancedVisibilityStoreConfigExists {
enableWriteToSecondaryAdvancedVisibility := dc.GetBoolProperty(
dynamicconfig.EnableWriteToSecondaryAdvancedVisibility,
false,
)
if enableWriteToSecondaryAdvancedVisibility() {
return dynamicconfig.GetStringPropertyFn(SecondaryVisibilityWritingModeDual)
}
}
return dynamicconfig.GetStringPropertyFn(SecondaryVisibilityWritingModeOff)
}

func AllowListForValidation(storeNames []string) bool {
if len(storeNames) == 0 {
return false
Expand Down
Loading

0 comments on commit 12655c4

Please sign in to comment.