diff --git a/service/worker/esanalyzer/analyzer.go b/service/worker/esanalyzer/analyzer.go index 4e901e2f3e5..1c4cf4b8585 100644 --- a/service/worker/esanalyzer/analyzer.go +++ b/service/worker/esanalyzer/analyzer.go @@ -40,10 +40,18 @@ import ( es "github.com/uber/cadence/common/elasticsearch" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" + "github.com/uber/cadence/common/pinot" "github.com/uber/cadence/common/resource" "github.com/uber/cadence/service/worker/workercommon" ) +type readMode string + +const ( + Pinot readMode = "pinot" + ES readMode = "es" +) + type ( // Analyzer is the background sub-system to query ElasticSearch and execute mitigations Analyzer struct { @@ -51,6 +59,8 @@ type ( frontendClient frontend.Client clientBean client.Bean esClient es.GenericClient + pinotClient pinot.GenericClient + readMode readMode logger log.Logger tallyScope tally.Scope visibilityIndexName string @@ -98,6 +108,7 @@ func New( frontendClient frontend.Client, clientBean client.Bean, esClient es.GenericClient, + pinotClient pinot.GenericClient, esConfig *config.ElasticSearchConfig, logger log.Logger, tallyScope tally.Scope, @@ -105,11 +116,20 @@ func New( domainCache cache.DomainCache, config *Config, ) *Analyzer { + var mode readMode + if esClient != nil { + mode = ES + } else if pinotClient != nil { + mode = Pinot + } + return &Analyzer{ svcClient: svcClient, frontendClient: frontendClient, clientBean: clientBean, esClient: esClient, + pinotClient: pinotClient, + readMode: mode, logger: logger, tallyScope: tallyScope, visibilityIndexName: esConfig.Indices[common.VisibilityAppName], diff --git a/service/worker/esanalyzer/analyzer_test.go b/service/worker/esanalyzer/analyzer_test.go index 9d7e127d781..794358132fa 100644 --- a/service/worker/esanalyzer/analyzer_test.go +++ b/service/worker/esanalyzer/analyzer_test.go @@ -28,6 +28,7 @@ import ( "time" "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" "github.com/uber-go/tally" @@ -35,11 +36,14 @@ import ( "go.uber.org/cadence/testsuite" "go.uber.org/cadence/worker" "go.uber.org/cadence/workflow" + "go.uber.org/zap" "github.com/uber/cadence/client" "github.com/uber/cadence/client/admin" + "github.com/uber/cadence/common" "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/cluster" + "github.com/uber/cadence/common/config" "github.com/uber/cadence/common/dynamicconfig" "github.com/uber/cadence/common/elasticsearch" esMocks "github.com/uber/cadence/common/elasticsearch/mocks" @@ -47,6 +51,7 @@ import ( "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/metrics/mocks" "github.com/uber/cadence/common/persistence" + "github.com/uber/cadence/common/pinot" "github.com/uber/cadence/service/history/resource" ) @@ -340,3 +345,184 @@ func (s *esanalyzerWorkflowTestSuite) TestEmitWorkflowTypeCountMetricsActivity() s.NoError(err) } + +func TestNewAnalyzer(t *testing.T) { + mockESConfig := &config.ElasticSearchConfig{ + Indices: map[string]string{ + common.VisibilityAppName: "test", + }, + } + + mockESClient := &esMocks.GenericClient{} + testAnalyzer1 := New(nil, nil, nil, mockESClient, nil, mockESConfig, nil, nil, nil, nil, nil) + + mockPinotClient := &pinot.MockGenericClient{} + testAnalyzer2 := New(nil, nil, nil, nil, mockPinotClient, mockESConfig, nil, nil, nil, nil, nil) + + assert.Equal(t, testAnalyzer1.readMode, ES) + assert.Equal(t, testAnalyzer2.readMode, Pinot) +} + +func TestEmitWorkflowTypeCountMetricsESErrorCases(t *testing.T) { + mockESConfig := &config.ElasticSearchConfig{ + Indices: map[string]string{ + common.VisibilityAppName: "test", + }, + } + + ctrl := gomock.NewController(t) + mockESClient := &esMocks.GenericClient{} + mockDomainCache := cache.NewMockDomainCache(ctrl) + testAnalyzer := New(nil, nil, nil, mockESClient, nil, mockESConfig, log.NewNoop(), tally.NoopScope, nil, mockDomainCache, nil) + testWorkflow := &Workflow{analyzer: testAnalyzer} + + tests := map[string]struct { + domainCacheAffordance func(mockDomainCache *cache.MockDomainCache) + ESClientAffordance func(mockESClient *esMocks.GenericClient) + expectedErr error + }{ + "Case1: error getting domain": { + domainCacheAffordance: func(mockDomainCache *cache.MockDomainCache) { + mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(nil, fmt.Errorf("error")).Times(1) + }, + ESClientAffordance: func(mockESClient *esMocks.GenericClient) {}, + expectedErr: fmt.Errorf("error"), + }, + "Case2: error ES searchRaw": { + domainCacheAffordance: func(mockDomainCache *cache.MockDomainCache) { + mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(cache.NewDomainCacheEntryForTest( + &persistence.DomainInfo{ID: "test-id"}, nil, false, nil, 0, nil), nil) + }, + ESClientAffordance: func(mockESClient *esMocks.GenericClient) { + mockESClient.On("SearchRaw", mock.Anything, mock.Anything, mock.Anything).Return( + nil, fmt.Errorf("error")).Times(1) + }, + expectedErr: fmt.Errorf("error"), + }, + "Case3: foundAggregation is false": { + domainCacheAffordance: func(mockDomainCache *cache.MockDomainCache) { + mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(cache.NewDomainCacheEntryForTest( + &persistence.DomainInfo{ID: "test-id"}, nil, false, nil, 0, nil), nil) + }, + ESClientAffordance: func(mockESClient *esMocks.GenericClient) { + mockESClient.On("SearchRaw", mock.Anything, mock.Anything, mock.Anything).Return( + &elasticsearch.RawResponse{ + Aggregations: map[string]json.RawMessage{}, + }, nil).Times(1) + }, + expectedErr: nil, + }, + "Case4: error unmarshalling aggregation": { + domainCacheAffordance: func(mockDomainCache *cache.MockDomainCache) { + mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(cache.NewDomainCacheEntryForTest( + &persistence.DomainInfo{ID: "test-id"}, nil, false, nil, 0, nil), nil) + }, + ESClientAffordance: func(mockESClient *esMocks.GenericClient) { + mockESClient.On("SearchRaw", mock.Anything, mock.Anything, mock.Anything).Return( + &elasticsearch.RawResponse{ + Aggregations: map[string]json.RawMessage{ + "wftypes": []byte("invalid"), + }, + }, nil).Times(1) + }, + expectedErr: fmt.Errorf("invalid character 'i' looking for beginning of value"), + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + // Set up mocks + test.domainCacheAffordance(mockDomainCache) + test.ESClientAffordance(mockESClient) + + err := testWorkflow.emitWorkflowTypeCountMetricsES(context.Background(), "test-domain", zap.NewNop()) + if err == nil { + assert.Equal(t, test.expectedErr, err) + } else { + assert.Equal(t, test.expectedErr.Error(), err.Error()) + } + }) + } +} + +func TestEmitWorkflowVersionMetricsESErrorCases(t *testing.T) { + mockESConfig := &config.ElasticSearchConfig{ + Indices: map[string]string{ + common.VisibilityAppName: "test", + }, + } + + ctrl := gomock.NewController(t) + mockESClient := &esMocks.GenericClient{} + mockDomainCache := cache.NewMockDomainCache(ctrl) + testAnalyzer := New(nil, nil, nil, mockESClient, nil, mockESConfig, log.NewNoop(), tally.NoopScope, nil, mockDomainCache, nil) + testWorkflow := &Workflow{analyzer: testAnalyzer} + + tests := map[string]struct { + domainCacheAffordance func(mockDomainCache *cache.MockDomainCache) + ESClientAffordance func(mockESClient *esMocks.GenericClient) + expectedErr error + }{ + "Case1: error getting domain": { + domainCacheAffordance: func(mockDomainCache *cache.MockDomainCache) { + mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(nil, fmt.Errorf("error")).Times(1) + }, + ESClientAffordance: func(mockESClient *esMocks.GenericClient) {}, + expectedErr: fmt.Errorf("error"), + }, + "Case2: error ES searchRaw": { + domainCacheAffordance: func(mockDomainCache *cache.MockDomainCache) { + mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(cache.NewDomainCacheEntryForTest( + &persistence.DomainInfo{ID: "test-id"}, nil, false, nil, 0, nil), nil) + }, + ESClientAffordance: func(mockESClient *esMocks.GenericClient) { + mockESClient.On("SearchRaw", mock.Anything, mock.Anything, mock.Anything).Return( + nil, fmt.Errorf("error")).Times(1) + }, + expectedErr: fmt.Errorf("error"), + }, + "Case3: foundAggregation is false": { + domainCacheAffordance: func(mockDomainCache *cache.MockDomainCache) { + mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(cache.NewDomainCacheEntryForTest( + &persistence.DomainInfo{ID: "test-id"}, nil, false, nil, 0, nil), nil) + }, + ESClientAffordance: func(mockESClient *esMocks.GenericClient) { + mockESClient.On("SearchRaw", mock.Anything, mock.Anything, mock.Anything).Return( + &elasticsearch.RawResponse{ + Aggregations: map[string]json.RawMessage{}, + }, nil).Times(1) + }, + expectedErr: nil, + }, + "Case4: error unmarshalling aggregation": { + domainCacheAffordance: func(mockDomainCache *cache.MockDomainCache) { + mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(cache.NewDomainCacheEntryForTest( + &persistence.DomainInfo{ID: "test-id"}, nil, false, nil, 0, nil), nil) + }, + ESClientAffordance: func(mockESClient *esMocks.GenericClient) { + mockESClient.On("SearchRaw", mock.Anything, mock.Anything, mock.Anything).Return( + &elasticsearch.RawResponse{ + Aggregations: map[string]json.RawMessage{ + "wftypes": []byte("invalid"), + }, + }, nil).Times(1) + }, + expectedErr: fmt.Errorf("invalid character 'i' looking for beginning of value"), + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + // Set up mocks + test.domainCacheAffordance(mockDomainCache) + test.ESClientAffordance(mockESClient) + + err := testWorkflow.emitWorkflowVersionMetricsES(context.Background(), "test-domain", zap.NewNop()) + if err == nil { + assert.Equal(t, test.expectedErr, err) + } else { + assert.Equal(t, test.expectedErr.Error(), err.Error()) + } + }) + } +} diff --git a/service/worker/esanalyzer/domainWorkflowTypeCountWorkflow.go b/service/worker/esanalyzer/domainWorkflowTypeCountWorkflow.go index d48f57e68d5..f6027335276 100644 --- a/service/worker/esanalyzer/domainWorkflowTypeCountWorkflow.go +++ b/service/worker/esanalyzer/domainWorkflowTypeCountWorkflow.go @@ -146,51 +146,64 @@ func (w *Workflow) emitWorkflowTypeCountMetrics(ctx context.Context) error { return err } for _, domainName := range workflowMetricDomainNames { - wfTypeCountEsQuery, err := w.getDomainWorkflowTypeCountQuery(domainName) - if err != nil { - logger.Error("Failed to get ElasticSearch query to find domain workflow type Info", - zap.Error(err), - zap.String("DomainName", domainName), - ) - return err + switch w.analyzer.readMode { + case ES: + err = w.emitWorkflowTypeCountMetricsES(ctx, domainName, logger) + default: + err = w.emitWorkflowTypeCountMetricsES(ctx, domainName, logger) } - response, err := w.analyzer.esClient.SearchRaw(ctx, w.analyzer.visibilityIndexName, wfTypeCountEsQuery) if err != nil { - logger.Error("Failed to query ElasticSearch to find workflow type count Info", - zap.Error(err), - zap.String("VisibilityQuery", wfTypeCountEsQuery), - zap.String("DomainName", domainName), - ) return err } - agg, foundAggregation := response.Aggregations[workflowTypesAggKey] - - if !foundAggregation { - logger.Error("ElasticSearch error: aggregation failed.", - zap.Error(err), - zap.String("Aggregation", string(agg)), - zap.String("DomainName", domainName), - zap.String("VisibilityQuery", wfTypeCountEsQuery), - ) - return err - } - var domainWorkflowTypeCount DomainWorkflowTypeCount - err = json.Unmarshal(agg, &domainWorkflowTypeCount) - if err != nil { - logger.Error("ElasticSearch error parsing aggregation.", - zap.Error(err), - zap.String("Aggregation", string(agg)), - zap.String("DomainName", domainName), - zap.String("VisibilityQuery", wfTypeCountEsQuery), - ) - return err - } - for _, workflowType := range domainWorkflowTypeCount.WorkflowTypes { - w.analyzer.tallyScope.Tagged( - map[string]string{domainTag: domainName, workflowTypeTag: workflowType.AggregateKey}, - ).Gauge(workflowTypeCountMetrics).Update(float64(workflowType.AggregateCount)) - } } } return nil } + +func (w *Workflow) emitWorkflowTypeCountMetricsES(ctx context.Context, domainName string, logger *zap.Logger) error { + wfTypeCountEsQuery, err := w.getDomainWorkflowTypeCountQuery(domainName) + if err != nil { + logger.Error("Failed to get ElasticSearch query to find domain workflow type Info", + zap.Error(err), + zap.String("DomainName", domainName), + ) + return err + } + response, err := w.analyzer.esClient.SearchRaw(ctx, w.analyzer.visibilityIndexName, wfTypeCountEsQuery) + if err != nil { + logger.Error("Failed to query ElasticSearch to find workflow type count Info", + zap.Error(err), + zap.String("VisibilityQuery", wfTypeCountEsQuery), + zap.String("DomainName", domainName), + ) + return err + } + agg, foundAggregation := response.Aggregations[workflowTypesAggKey] + + if !foundAggregation { + logger.Error("ElasticSearch error: aggregation failed.", + zap.Error(err), + zap.String("Aggregation", string(agg)), + zap.String("DomainName", domainName), + zap.String("VisibilityQuery", wfTypeCountEsQuery), + ) + return err + } + var domainWorkflowTypeCount DomainWorkflowTypeCount + err = json.Unmarshal(agg, &domainWorkflowTypeCount) + if err != nil { + logger.Error("ElasticSearch error parsing aggregation.", + zap.Error(err), + zap.String("Aggregation", string(agg)), + zap.String("DomainName", domainName), + zap.String("VisibilityQuery", wfTypeCountEsQuery), + ) + return err + } + for _, workflowType := range domainWorkflowTypeCount.WorkflowTypes { + w.analyzer.tallyScope.Tagged( + map[string]string{domainTag: domainName, workflowTypeTag: workflowType.AggregateKey}, + ).Gauge(workflowTypeCountMetrics).Update(float64(workflowType.AggregateCount)) + } + return nil +} diff --git a/service/worker/esanalyzer/workflow.go b/service/worker/esanalyzer/workflow.go index ea5281134d8..c3bc68866f7 100644 --- a/service/worker/esanalyzer/workflow.go +++ b/service/worker/esanalyzer/workflow.go @@ -157,52 +157,65 @@ func (w *Workflow) emitWorkflowVersionMetrics(ctx context.Context) error { return err } for _, domainName := range workflowMetricDomainNames { - wfVersionEsQuery, err := w.getWorkflowVersionQuery(domainName) - if err != nil { - logger.Error("Failed to get ElasticSearch query to find workflow version Info", - zap.Error(err), - zap.String("DomainName", domainName), - ) - return err + switch w.analyzer.readMode { + case ES: + err = w.emitWorkflowVersionMetricsES(ctx, domainName, logger) + default: + err = w.emitWorkflowVersionMetricsES(ctx, domainName, logger) } - response, err := w.analyzer.esClient.SearchRaw(ctx, w.analyzer.visibilityIndexName, wfVersionEsQuery) if err != nil { - logger.Error("Failed to query ElasticSearch to find workflow version Info", - zap.Error(err), - zap.String("VisibilityQuery", wfVersionEsQuery), - zap.String("DomainName", domainName), - ) return err } - agg, foundAggregation := response.Aggregations[workflowTypesAggKey] + } + } + return nil +} - if !foundAggregation { - logger.Error("ElasticSearch error: aggregation failed.", - zap.Error(err), - zap.String("Aggregation", string(agg)), - zap.String("DomainName", domainName), - zap.String("VisibilityQuery", wfVersionEsQuery), - ) - return err - } - var domainWorkflowVersionCount DomainWorkflowVersionCount - err = json.Unmarshal(agg, &domainWorkflowVersionCount) - if err != nil { - logger.Error("ElasticSearch error parsing aggregation.", - zap.Error(err), - zap.String("Aggregation", string(agg)), - zap.String("DomainName", domainName), - zap.String("VisibilityQuery", wfVersionEsQuery), - ) - return err - } - for _, workflowType := range domainWorkflowVersionCount.WorkflowTypes { - for _, workflowVersion := range workflowType.WorkflowVersions.WorkflowVersions { - w.analyzer.tallyScope.Tagged( - map[string]string{domainTag: domainName, workflowVersionTag: workflowVersion.AggregateKey, workflowTypeTag: workflowType.AggregateKey}, - ).Gauge(workflowVersionCountMetrics).Update(float64(workflowVersion.AggregateCount)) - } - } +func (w *Workflow) emitWorkflowVersionMetricsES(ctx context.Context, domainName string, logger *zap.Logger) error { + wfVersionEsQuery, err := w.getWorkflowVersionQuery(domainName) + if err != nil { + logger.Error("Failed to get ElasticSearch query to find workflow version Info", + zap.Error(err), + zap.String("DomainName", domainName), + ) + return err + } + response, err := w.analyzer.esClient.SearchRaw(ctx, w.analyzer.visibilityIndexName, wfVersionEsQuery) + if err != nil { + logger.Error("Failed to query ElasticSearch to find workflow version Info", + zap.Error(err), + zap.String("VisibilityQuery", wfVersionEsQuery), + zap.String("DomainName", domainName), + ) + return err + } + agg, foundAggregation := response.Aggregations[workflowTypesAggKey] + + if !foundAggregation { + logger.Error("ElasticSearch error: aggregation failed.", + zap.Error(err), + zap.String("Aggregation", string(agg)), + zap.String("DomainName", domainName), + zap.String("VisibilityQuery", wfVersionEsQuery), + ) + return err + } + var domainWorkflowVersionCount DomainWorkflowVersionCount + err = json.Unmarshal(agg, &domainWorkflowVersionCount) + if err != nil { + logger.Error("ElasticSearch error parsing aggregation.", + zap.Error(err), + zap.String("Aggregation", string(agg)), + zap.String("DomainName", domainName), + zap.String("VisibilityQuery", wfVersionEsQuery), + ) + return err + } + for _, workflowType := range domainWorkflowVersionCount.WorkflowTypes { + for _, workflowVersion := range workflowType.WorkflowVersions.WorkflowVersions { + w.analyzer.tallyScope.Tagged( + map[string]string{domainTag: domainName, workflowVersionTag: workflowVersion.AggregateKey, workflowTypeTag: workflowType.AggregateKey}, + ).Gauge(workflowVersionCountMetrics).Update(float64(workflowVersion.AggregateCount)) } } return nil diff --git a/service/worker/service.go b/service/worker/service.go index 8e0dd06da2f..e173b73ddcc 100644 --- a/service/worker/service.go +++ b/service/worker/service.go @@ -283,6 +283,7 @@ func (s *Service) startESAnalyzer() { s.GetFrontendClient(), s.GetClientBean(), s.params.ESClient, + s.params.PinotClient, s.params.ESConfig, s.GetLogger(), s.params.MetricScope,