Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor configs for visibility #4095

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,9 @@ type (
DefaultStore string `yaml:"defaultStore" validate:"nonzero"`
// VisibilityStore is the name of the datastore to be used for visibility records
VisibilityStore string `yaml:"visibilityStore"`
// SecondaryVisibilityStore is the name of the secondary datastore to be used for visibility records
SecondaryVisibilityStore string `yaml:"secondaryVisibilityStore"`
// DEPRECATED: use VisibilityStore key instead of AdvancedVisibilityStore
// AdvancedVisibilityStore is the name of the datastore to be used for visibility records
AdvancedVisibilityStore string `yaml:"advancedVisibilityStore"`
// NumHistoryShards is the desired number of history shards. This config doesn't
Expand Down
156 changes: 139 additions & 17 deletions common/config/persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"strings"

"github.com/gocql/gocql"
"go.temporal.io/server/common/persistence/visibility/store/elasticsearch/client"
)

const (
Expand All @@ -54,6 +55,68 @@ func (c *Persistence) Validate() error {
if c.VisibilityStore != "" {
stores = append(stores, c.VisibilityStore)
}
if c.SecondaryVisibilityStore != "" {
stores = append(stores, c.SecondaryVisibilityStore)
}
if c.AdvancedVisibilityStore != "" {
stores = append(stores, c.AdvancedVisibilityStore)
}

// There are 3 config keys:
// - visibilityStore: can set any data store
// - secondaryVisibilityStore: can set any data store
// - advancedVisibilityStore: can only set elasticsearch data store
// If visibilityStore is set, then it's always the primary.
// If secondaryVisibilityStore is set, it's always the secondary.
//
// Valid dual visibility combinations (order: primary, secondary):
// - visibilityStore (standard), secondaryVisibilityStore (any)
// - visibilityStore (standard), advancedVisibilityStore (es)
// - visibilityStore (advanced sql), secondaryVisibilityStore (advanced sql)
// - visibilityStore (es), visibilityStore (es) [via elasticsearch.indices config]
// - advancedVisibilityStore (es), advancedVisibilityStore (es) [via elasticsearch.indices config]
//
// Invalid dual visibility combinations:
// - visibilityStore (advanced sql), secondaryVisibilityStore (standard, es)
// - visibilityStore (advanced sql), advancedVisibilityStore (es)
// - visibilityStore (es), secondaryVisibilityStore (any)
// - visibilityStore (es), advancedVisibilityStore (es)
// - advancedVisibilityStore (es), secondaryVisibilityStore (any)
//
// The validation for dual visibility pair (advanced sql, advanced sql) is in visibility factory
// due to circular dependency. This will be better after standard visibility is removed.

if c.VisibilityStore == "" && c.AdvancedVisibilityStore == "" {
return errors.New("persistence config: visibilityStore must be specified")
}
if c.SecondaryVisibilityStore != "" && c.AdvancedVisibilityStore != "" {
return errors.New(
"persistence config: cannot specify both secondaryVisibilityStore and " +
"advancedVisibilityStore",
)
}
if c.AdvancedVisibilityStore != "" && c.DataStores[c.AdvancedVisibilityStore].Elasticsearch == nil {
return fmt.Errorf(
"persistence config: advanced visibility datastore %q: missing elasticsearch config",
c.AdvancedVisibilityStore,
)
}
if c.DataStores[c.VisibilityStore].Elasticsearch != nil &&
(c.SecondaryVisibilityStore != "" || c.AdvancedVisibilityStore != "") {
return errors.New(
"persistence config: cannot set secondaryVisibilityStore or advancedVisibilityStore " +
"when visibilityStore is setting elasticsearch datastore",
)
}
if c.DataStores[c.SecondaryVisibilityStore].Elasticsearch.GetSecondaryVisibilityIndex() != "" {
return fmt.Errorf(
"persistence config: secondary visibility datastore %q: elasticsearch config: "+
"cannot set secondary_visibility",
c.SecondaryVisibilityStore,
)
}

cntEsConfigs := 0
for _, st := range stores {
ds, ok := c.DataStores[st]
if !ok {
Expand All @@ -62,10 +125,17 @@ func (c *Persistence) Validate() error {
if err := ds.Validate(); err != nil {
return fmt.Errorf("persistence config: datastore %q: %s", st, err.Error())
}
if ds.Elasticsearch != nil {
cntEsConfigs++
}
}

if err := c.validateAdvancedVisibility(); err != nil {
return err
if cntEsConfigs > 1 {
return fmt.Errorf(
"persistence config: cannot have more than one Elasticsearch visibility store config " +
"(use `elasticsearch.indices.secondary_visibility` config key if you need to set a " +
"secondary Elasticsearch visibility store)",
)
}

return nil
Expand All @@ -76,34 +146,75 @@ func (c *Persistence) StandardVisibilityConfigExist() bool {
return c.VisibilityStore != ""
}

// SecondaryVisibilityConfigExist returns whether user specified secondaryVisibilityStore in config
func (c *Persistence) SecondaryVisibilityConfigExist() bool {
return c.SecondaryVisibilityStore != ""
}

// AdvancedVisibilityConfigExist returns whether user specified advancedVisibilityStore in config
func (c *Persistence) AdvancedVisibilityConfigExist() bool {
return c.AdvancedVisibilityStore != ""
}

func (c *Persistence) IsSQLVisibilityStore() bool {
return c.StandardVisibilityConfigExist() && c.DataStores[c.VisibilityStore].SQL != nil
return (c.StandardVisibilityConfigExist() && c.DataStores[c.VisibilityStore].SQL != nil) ||
(c.SecondaryVisibilityConfigExist() && c.DataStores[c.SecondaryVisibilityStore].SQL != nil)
}

func (c *Persistence) validateAdvancedVisibility() error {
if !c.StandardVisibilityConfigExist() && !c.AdvancedVisibilityConfigExist() {
return errors.New("persistence config: one of visibilityStore or advancedVisibilityStore must be specified")
func (c *Persistence) GetVisibilityStoreConfig() DataStore {
if c.VisibilityStore != "" {
return c.DataStores[c.VisibilityStore]
}

if !c.AdvancedVisibilityConfigExist() {
return nil
if c.AdvancedVisibilityStore != "" {
return c.DataStores[c.AdvancedVisibilityStore]
}
// Based on validation above, this should never happen.
return DataStore{}
}

advancedVisibilityDataStore, ok := c.DataStores[c.AdvancedVisibilityStore]
if !ok {
return fmt.Errorf("persistence config: advanced visibility datastore %q: missing config", c.AdvancedVisibilityStore)
func (c *Persistence) GetSecondaryVisibilityStoreConfig() DataStore {
if c.SecondaryVisibilityStore != "" {
return c.DataStores[c.SecondaryVisibilityStore]
}

if err := advancedVisibilityDataStore.Elasticsearch.Validate(c.AdvancedVisibilityStore); err != nil {
return err
if c.VisibilityStore != "" {
if c.AdvancedVisibilityStore != "" {
return c.DataStores[c.AdvancedVisibilityStore]
}
ds := c.DataStores[c.VisibilityStore]
if ds.Elasticsearch != nil && ds.Elasticsearch.GetSecondaryVisibilityIndex() != "" {
esConfig := *ds.Elasticsearch
esConfig.Indices = map[string]string{
client.VisibilityAppName: ds.Elasticsearch.GetSecondaryVisibilityIndex(),
}
ds.Elasticsearch = &esConfig
return ds
}
}
if c.AdvancedVisibilityStore != "" {
ds := c.DataStores[c.AdvancedVisibilityStore]
if ds.Elasticsearch != nil && ds.Elasticsearch.GetSecondaryVisibilityIndex() != "" {
esConfig := *ds.Elasticsearch
esConfig.Indices = map[string]string{
client.VisibilityAppName: ds.Elasticsearch.GetSecondaryVisibilityIndex(),
}
ds.Elasticsearch = &esConfig
return ds
}
}
return DataStore{}
}

return nil
func (ds *DataStore) GetIndexName() string {
switch {
case ds.SQL != nil:
return ds.SQL.DatabaseName
case ds.Cassandra != nil:
return ds.Cassandra.Keyspace
case ds.Elasticsearch != nil:
return ds.Elasticsearch.GetVisibilityIndex()
default:
return ""
}
}

// Validate validates the data store config
Expand All @@ -118,8 +229,14 @@ func (ds *DataStore) Validate() error {
if ds.CustomDataStoreConfig != nil {
storeConfigCount++
}
if ds.Elasticsearch != nil {
storeConfigCount++
}
if storeConfigCount != 1 {
return errors.New("must provide config for one and only one for DataStore of cassandra or sql or custom stores")
return errors.New(
"must provide config for one and only one datastore: " +
"elasticsearch, cassandra, sql or custom store",
)
}

if ds.SQL != nil && ds.SQL.TaskScanPartitions == 0 {
Expand All @@ -130,6 +247,11 @@ func (ds *DataStore) Validate() error {
return err
}
}
if ds.Elasticsearch != nil {
if err := ds.Elasticsearch.Validate(); err != nil {
return err
}
}
return nil
}

Expand Down
5 changes: 5 additions & 0 deletions common/dynamicconfig/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,11 @@ func (c *Collection) GetTaskQueuePartitionsProperty(key Key) IntPropertyFnWithTa
return c.GetIntPropertyFilteredByTaskQueueInfo(key, defaultNumTaskQueuePartitions)
}

func (c *Collection) HasKey(key Key) bool {
cvs := c.client.GetValue(key)
return len(cvs) > 0
}

func findMatch(cvs, defaultCVs []ConstrainedValue, precedence []Constraints) (any, error) {
if len(cvs)+len(defaultCVs) == 0 {
return nil, errKeyNotPresent
Expand Down
10 changes: 10 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ const (

// keys for system

// DEPRECATED: the following block of configs are deprecated and replaced by the next block of configs
// StandardVisibilityPersistenceMaxReadQPS is the max QPC system host can query standard visibility DB (SQL or Cassandra) for read.
StandardVisibilityPersistenceMaxReadQPS = "system.standardVisibilityPersistenceMaxReadQPS"
// StandardVisibilityPersistenceMaxWriteQPS is the max QPC system host can query standard visibility DB (SQL or Cassandra) for write.
Expand All @@ -54,6 +55,15 @@ const (
EnableReadVisibilityFromES = "system.enableReadVisibilityFromES"
// EnableReadFromSecondaryAdvancedVisibility is the config to enable read from secondary Elasticsearch
EnableReadFromSecondaryAdvancedVisibility = "system.enableReadFromSecondaryAdvancedVisibility"

// VisibilityPersistenceMaxReadQPS is the max QPC system host can query visibility DB for read.
VisibilityPersistenceMaxReadQPS = "system.visibilityPersistenceMaxReadQPS"
// VisibilityPersistenceMaxWriteQPS is the max QPC system host can query visibility DB for write.
VisibilityPersistenceMaxWriteQPS = "system.visibilityPersistenceMaxWriteQPS"
// EnableReadFromSecondaryVisibility is the config to enable read from secondary visibility
EnableReadFromSecondaryVisibility = "system.enableReadFromSecondaryVisibility"
// SecondaryVisibilityWritingMode is key for how to write to secondary visibility
SecondaryVisibilityWritingMode = "system.secondaryVisibilityWritingMode"
// VisibilityDisableOrderByClause is the config to disable ORDERY BY clause for Elasticsearch
VisibilityDisableOrderByClause = "system.visibilityDisableOrderByClause"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,18 @@ func (s *VisibilityPersistenceSuite) SetupSuite() {
s.controller = gomock.NewController(s.T())
s.SearchAttributesProvider = searchattribute.NewTestProvider()
s.SearchAttributesMapperProvider = searchattribute.NewTestMapperProvider(nil)
s.VisibilityMgr, err = visibility.NewStandardManager(
s.VisibilityMgr, err = visibility.NewManager(
cfg,
resolver.NewNoopResolver(),
nil,
nil,
s.SearchAttributesProvider,
s.SearchAttributesMapperProvider,
dynamicconfig.GetIntPropertyFn(1000),
dynamicconfig.GetIntPropertyFn(1000),
dynamicconfig.GetBoolPropertyFnFilteredByNamespace(false),
dynamicconfig.GetStringPropertyFn(visibility.SecondaryVisibilityWritingModeOff),
dynamicconfig.GetBoolPropertyFnFilteredByNamespace(false),
metrics.NoopMetricsHandler,
s.Logger,
)
Expand Down
86 changes: 86 additions & 0 deletions common/persistence/visibility/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package visibility

import (
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/persistence/sql/sqlplugin/mysql"
"go.temporal.io/server/common/persistence/sql/sqlplugin/postgresql"
"go.temporal.io/server/common/persistence/sql/sqlplugin/sqlite"
Expand All @@ -47,6 +48,91 @@ func DefaultAdvancedVisibilityWritingMode(advancedVisibilityConfigExist bool) st
return SecondaryVisibilityWritingModeOff
}

//nolint:revive
func GetVisibilityPersistenceMaxReadQPS(
dc *dynamicconfig.Collection,
advancedVisibilityStoreConfigExists bool,
) dynamicconfig.IntPropertyFn {
if dc.HasKey(dynamicconfig.VisibilityPersistenceMaxReadQPS) {
return dc.GetIntProperty(dynamicconfig.VisibilityPersistenceMaxReadQPS, 9000)
}
if advancedVisibilityStoreConfigExists {
return dc.GetIntProperty(dynamicconfig.AdvancedVisibilityPersistenceMaxReadQPS, 9000)
}
return dc.GetIntProperty(dynamicconfig.StandardVisibilityPersistenceMaxReadQPS, 9000)
}

//nolint:revive
func GetVisibilityPersistenceMaxWriteQPS(
dc *dynamicconfig.Collection,
advancedVisibilityStoreConfigExists bool,
) dynamicconfig.IntPropertyFn {
if dc.HasKey(dynamicconfig.VisibilityPersistenceMaxWriteQPS) {
return dc.GetIntProperty(dynamicconfig.VisibilityPersistenceMaxWriteQPS, 9000)
}
if advancedVisibilityStoreConfigExists {
return dc.GetIntProperty(dynamicconfig.AdvancedVisibilityPersistenceMaxWriteQPS, 9000)
}
return dc.GetIntProperty(dynamicconfig.StandardVisibilityPersistenceMaxWriteQPS, 9000)
}

//nolint:revive
func GetEnableReadFromSecondaryVisibilityConfig(
dc *dynamicconfig.Collection,
visibilityStoreConfigExists bool,
advancedVisibilityStoreConfigExists bool,
) dynamicconfig.BoolPropertyFnWithNamespaceFilter {
if dc.HasKey(dynamicconfig.EnableReadFromSecondaryVisibility) {
return dc.GetBoolPropertyFnWithNamespaceFilter(
dynamicconfig.EnableReadFromSecondaryVisibility,
false,
)
}
if visibilityStoreConfigExists && advancedVisibilityStoreConfigExists {
return dc.GetBoolPropertyFnWithNamespaceFilter(
dynamicconfig.EnableReadVisibilityFromES,
advancedVisibilityStoreConfigExists,
)
}
if advancedVisibilityStoreConfigExists {
return dc.GetBoolPropertyFnWithNamespaceFilter(
dynamicconfig.EnableReadFromSecondaryAdvancedVisibility,
false,
)
}
return dynamicconfig.GetBoolPropertyFnFilteredByNamespace(false)
}

//nolint:revive
func GetSecondaryVisibilityWritingModeConfig(
dc *dynamicconfig.Collection,
visibilityStoreConfigExists bool,
advancedVisibilityStoreConfigExists bool,
) dynamicconfig.StringPropertyFn {
if dc.HasKey(dynamicconfig.SecondaryVisibilityWritingMode) {
return dc.GetStringProperty(
dynamicconfig.SecondaryVisibilityWritingMode,
SecondaryVisibilityWritingModeOff,
)
}
if visibilityStoreConfigExists && advancedVisibilityStoreConfigExists {
return dc.GetStringProperty(
dynamicconfig.AdvancedVisibilityWritingMode,
DefaultAdvancedVisibilityWritingMode(advancedVisibilityStoreConfigExists),
)
}
if advancedVisibilityStoreConfigExists {
enableWriteToSecondaryAdvancedVisibility := dc.GetBoolProperty(
dynamicconfig.EnableWriteToSecondaryAdvancedVisibility,
false,
)
if enableWriteToSecondaryAdvancedVisibility() {
return dynamicconfig.GetStringPropertyFn(SecondaryVisibilityWritingModeDual)
}
}
return dynamicconfig.GetStringPropertyFn(SecondaryVisibilityWritingModeOff)
}

func AllowListForValidation(storeNames []string) bool {
if len(storeNames) == 0 {
return false
Expand Down
Loading