diff --git a/service/worker/esanalyzer/analyzer_test.go b/service/worker/esanalyzer/analyzer_test.go index dd9fc3401ed..b3c9ae05f8a 100644 --- a/service/worker/esanalyzer/analyzer_test.go +++ b/service/worker/esanalyzer/analyzer_test.go @@ -416,7 +416,7 @@ func TestEmitWorkflowTypeCountMetricsESErrorCases(t *testing.T) { Aggregations: map[string]json.RawMessage{}, }, nil).Times(1) }, - expectedErr: nil, + expectedErr: fmt.Errorf("aggregation failed for domain in ES: test-domain"), }, "Case4: error unmarshalling aggregation": { domainCacheAffordance: func(mockDomainCache *cache.MockDomainCache) { @@ -501,7 +501,7 @@ func TestEmitWorkflowVersionMetricsESErrorCases(t *testing.T) { Aggregations: map[string]json.RawMessage{}, }, nil).Times(1) }, - expectedErr: nil, + expectedErr: fmt.Errorf("aggregation failed for domain in ES: test-domain"), }, "Case4: error unmarshalling aggregation": { domainCacheAffordance: func(mockDomainCache *cache.MockDomainCache) { @@ -596,7 +596,7 @@ func TestEmitWorkflowTypeCountMetricsPinot(t *testing.T) { PinotClientAffordance: func(mockPinotClient *pinot.MockGenericClient) { mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return([][]interface{}{}, nil).Times(1) }, - expectedErr: nil, + expectedErr: fmt.Errorf("aggregation failed for domain in Pinot: test-domain"), }, "Case4: error parsing workflow count": { domainCacheAffordance: func(mockDomainCache *cache.MockDomainCache) { @@ -627,3 +627,149 @@ func TestEmitWorkflowTypeCountMetricsPinot(t *testing.T) { }) } } + +func TestEmitWorkflowVersionMetricsPinot(t *testing.T) { + mockPinotConfig := &config.PinotVisibilityConfig{ + Table: "test", + } + mockESConfig := &config.ElasticSearchConfig{ + Indices: map[string]string{ + common.VisibilityAppName: "test", + }, + } + + ctrl := gomock.NewController(t) + + mockPinotClient := pinot.NewMockGenericClient(ctrl) + mockDomainCache := cache.NewMockDomainCache(ctrl) + testAnalyzer := New(nil, nil, nil, nil, mockPinotClient, mockESConfig, mockPinotConfig, log.NewNoop(), tally.NoopScope, nil, mockDomainCache, nil) + testWorkflow := &Workflow{analyzer: testAnalyzer} + + tests := map[string]struct { + domainCacheAffordance func(mockDomainCache *cache.MockDomainCache) + PinotClientAffordance func(mockPinotClient *pinot.MockGenericClient) + expectedErr error + }{ + "Case0: success": { + domainCacheAffordance: func(mockDomainCache *cache.MockDomainCache) { + mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(cache.NewDomainCacheEntryForTest( + &persistence.DomainInfo{ID: "test-id"}, nil, false, nil, 0, nil), nil).Times(3) + }, + PinotClientAffordance: func(mockPinotClient *pinot.MockGenericClient) { + mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return([][]interface{}{ + {"test-wf-type0", 100}, + {"test-wf-type1", 200}, + }, nil).Times(1) + mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return([][]interface{}{ + {"test-wf-version0", 1}, + {"test-wf-version1", 20}, + }, nil).Times(1) + mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return([][]interface{}{ + {"test-wf-version3", 10}, + {"test-wf-version4", 2}, + }, nil).Times(1) + }, + expectedErr: nil, + }, + "Case1: error getting domain": { + domainCacheAffordance: func(mockDomainCache *cache.MockDomainCache) { + mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(nil, fmt.Errorf("domain error")).Times(1) + }, + PinotClientAffordance: func(mockPinotClient *pinot.MockGenericClient) {}, + expectedErr: fmt.Errorf("domain error"), + }, + "Case2: error Pinot query": { + domainCacheAffordance: func(mockDomainCache *cache.MockDomainCache) { + mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(cache.NewDomainCacheEntryForTest( + &persistence.DomainInfo{ID: "test-id"}, nil, false, nil, 0, nil), nil) + }, + PinotClientAffordance: func(mockPinotClient *pinot.MockGenericClient) { + mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return(nil, fmt.Errorf("pinot error")).Times(1) + }, + expectedErr: fmt.Errorf("failed to query Pinot to find workflow type count Info: test-domain, error: pinot error"), + }, + "Case3: Aggregation is empty": { + domainCacheAffordance: func(mockDomainCache *cache.MockDomainCache) { + mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(cache.NewDomainCacheEntryForTest( + &persistence.DomainInfo{ID: "test-id"}, nil, false, nil, 0, nil), nil) + }, + PinotClientAffordance: func(mockPinotClient *pinot.MockGenericClient) { + mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return([][]interface{}{}, nil).Times(1) + }, + expectedErr: fmt.Errorf("aggregation failed for domain in Pinot: test-domain"), + }, + "Case4: error parsing workflow count": { + domainCacheAffordance: func(mockDomainCache *cache.MockDomainCache) { + mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(cache.NewDomainCacheEntryForTest( + &persistence.DomainInfo{ID: "test-id"}, nil, false, nil, 0, nil), nil) + }, + PinotClientAffordance: func(mockPinotClient *pinot.MockGenericClient) { + mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return([][]interface{}{ + {"test", "invalid"}, + }, nil).Times(1) + }, + expectedErr: fmt.Errorf("error parsing workflow count for workflow type test"), + }, + "Case5-1: failure case in queryWorkflowVersionsWithType: getWorkflowVersionPinotQuery error": { + domainCacheAffordance: func(mockDomainCache *cache.MockDomainCache) { + mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(cache.NewDomainCacheEntryForTest( + &persistence.DomainInfo{ID: "test-id"}, nil, false, nil, 0, nil), nil).Times(1) + mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(nil, fmt.Errorf("domain error")).Times(1) + }, + PinotClientAffordance: func(mockPinotClient *pinot.MockGenericClient) { + mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return([][]interface{}{ + {"test-wf-type0", 100}, + {"test-wf-type1", 200}, + }, nil).Times(1) + }, + expectedErr: fmt.Errorf("error querying workflow versions for workflow type: test-wf-type0: error: domain error"), + }, + "Case5-2: failure case in queryWorkflowVersionsWithType: SearchAggr error": { + domainCacheAffordance: func(mockDomainCache *cache.MockDomainCache) { + mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(cache.NewDomainCacheEntryForTest( + &persistence.DomainInfo{ID: "test-id"}, nil, false, nil, 0, nil), nil).Times(2) + }, + PinotClientAffordance: func(mockPinotClient *pinot.MockGenericClient) { + mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return([][]interface{}{ + {"test-wf-type0", 100}, + {"test-wf-type1", 200}, + }, nil).Times(1) + mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return(nil, fmt.Errorf("pinot error")).Times(1) + }, + expectedErr: fmt.Errorf("error querying workflow versions for workflow type: test-wf-type0: error: pinot error"), + }, + "Case5-3: failure case in queryWorkflowVersionsWithType: error parsing workflow count": { + domainCacheAffordance: func(mockDomainCache *cache.MockDomainCache) { + mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(cache.NewDomainCacheEntryForTest( + &persistence.DomainInfo{ID: "test-id"}, nil, false, nil, 0, nil), nil).Times(2) + }, + PinotClientAffordance: func(mockPinotClient *pinot.MockGenericClient) { + mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return([][]interface{}{ + {"test-wf-type0", 100}, + {"test-wf-type1", 200}, + }, nil).Times(1) + mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return([][]interface{}{ + {"test-wf-version0", 1.5}, + {"test-wf-version1", 20}, + }, nil).Times(1) + }, + expectedErr: fmt.Errorf("error querying workflow versions for workflow type: " + + "test-wf-type0: error: error parsing workflow count for workflow version test-wf-version0"), + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + // Set up mocks + test.domainCacheAffordance(mockDomainCache) + test.PinotClientAffordance(mockPinotClient) + + err := testWorkflow.emitWorkflowVersionMetricsPinot("test-domain", zap.NewNop()) + if err == nil { + assert.Equal(t, test.expectedErr, err) + } else { + assert.Equal(t, test.expectedErr.Error(), err.Error()) + } + }) + } +} diff --git a/service/worker/esanalyzer/domainWorkflowTypeCountWorkflow.go b/service/worker/esanalyzer/domainWorkflowTypeCountWorkflow.go index 864815e787a..cdd2cd6d6f6 100644 --- a/service/worker/esanalyzer/domainWorkflowTypeCountWorkflow.go +++ b/service/worker/esanalyzer/domainWorkflowTypeCountWorkflow.go @@ -213,7 +213,7 @@ func (w *Workflow) emitWorkflowTypeCountMetricsPinot(domainName string, logger * zap.String("DomainName", domainName), zap.String("VisibilityQuery", wfTypeCountPinotQuery), ) - return err + return fmt.Errorf("aggregation failed for domain in Pinot: %s", domainName) } var domainWorkflowTypeCount DomainWorkflowTypeCount for _, row := range response { @@ -267,7 +267,7 @@ func (w *Workflow) emitWorkflowTypeCountMetricsES(ctx context.Context, domainNam zap.String("DomainName", domainName), zap.String("VisibilityQuery", wfTypeCountEsQuery), ) - return err + return fmt.Errorf("aggregation failed for domain in ES: %s", domainName) } var domainWorkflowTypeCount DomainWorkflowTypeCount err = json.Unmarshal(agg, &domainWorkflowTypeCount) diff --git a/service/worker/esanalyzer/workflow.go b/service/worker/esanalyzer/workflow.go index c3bc68866f7..be1dcd80547 100644 --- a/service/worker/esanalyzer/workflow.go +++ b/service/worker/esanalyzer/workflow.go @@ -31,6 +31,8 @@ import ( cclient "go.uber.org/cadence/client" "go.uber.org/cadence/workflow" "go.uber.org/zap" + + "github.com/uber/cadence/common/pinot" ) const ( @@ -146,6 +148,47 @@ func (w *Workflow) getWorkflowVersionQuery(domainName string) (string, error) { `, domain.GetInfo().ID), nil } +func (w *Workflow) getWorkflowTypePinotQuery(domainName string) (string, error) { + domain, err := w.analyzer.domainCache.GetDomain(domainName) + if err != nil { + return "", err + } + // exclude uninitialized workflow executions by checking whether record has start time field + // there's a "LIMIT 10" because in ES, Aggr clause by default returns the top 10 results + return fmt.Sprintf(` +SELECT WorkflowType, COUNT(*) AS count +FROM %s +WHERE DomainID = '%s' + AND CloseStatus = -1 + AND StartTime > 0 +GROUP BY WorkflowType +ORDER BY count DESC +LIMIT 10 +OFFSET 0 + `, w.analyzer.pinotTableName, domain.GetInfo().ID), nil +} + +func (w *Workflow) getWorkflowVersionPinotQuery(domainName string, wfType string) (string, error) { + domain, err := w.analyzer.domainCache.GetDomain(domainName) + if err != nil { + return "", err + } + // exclude uninitialized workflow executions by checking whether record has start time field + // there's a "LIMIT 10" because in ES, Aggr clause by default returns the top 10 results + return fmt.Sprintf(` +SELECT JSON_EXTRACT_SCALAR(Attr, '$.CadenceChangeVersion', 'STRING_ARRAY') AS CadenceChangeVersion, COUNT(*) AS count +FROM %s +WHERE DomainID = '%s' + AND CloseStatus = -1 + AND StartTime > 0 + AND WorkflowType = '%s' +GROUP BY JSON_EXTRACT_SCALAR(Attr, '$.CadenceChangeVersion', 'STRING_ARRAY') AS CadenceChangeVersion +ORDER BY count DESC +LIMIT 10 +OFFSET 0 + `, w.analyzer.pinotTableName, domain.GetInfo().ID, wfType), nil +} + // emitWorkflowVersionMetrics is an activity that emits the running WF versions of a domain func (w *Workflow) emitWorkflowVersionMetrics(ctx context.Context) error { logger := activity.GetLogger(ctx) @@ -160,6 +203,8 @@ func (w *Workflow) emitWorkflowVersionMetrics(ctx context.Context) error { switch w.analyzer.readMode { case ES: err = w.emitWorkflowVersionMetricsES(ctx, domainName, logger) + case Pinot: + err = w.emitWorkflowVersionMetricsPinot(domainName, logger) default: err = w.emitWorkflowVersionMetricsES(ctx, domainName, logger) } @@ -171,6 +216,123 @@ func (w *Workflow) emitWorkflowVersionMetrics(ctx context.Context) error { return nil } +func (w *Workflow) emitWorkflowVersionMetricsPinot(domainName string, logger *zap.Logger) error { + wfVersionPinotQuery, err := w.getWorkflowTypePinotQuery(domainName) + if err != nil { + logger.Error("Failed to get Pinot query to find workflow type Info", + zap.Error(err), + zap.String("DomainName", domainName), + ) + return err + } + response, err := w.analyzer.pinotClient.SearchAggr(&pinot.SearchRequest{Query: wfVersionPinotQuery}) + if err != nil { + logger.Error("Failed to query Pinot to find workflow type count Info", + zap.Error(err), + zap.String("VisibilityQuery", wfVersionPinotQuery), + zap.String("DomainName", domainName), + ) + return fmt.Errorf("failed to query Pinot to find workflow type count Info: %s, error: %s", domainName, err.Error()) + } + foundAggregation := len(response) > 0 + + if !foundAggregation { + logger.Error("Pinot error: aggregation failed.", + zap.Error(err), + zap.String("Aggregation", fmt.Sprintf("%v", response)), + zap.String("DomainName", domainName), + zap.String("VisibilityQuery", wfVersionPinotQuery), + ) + return fmt.Errorf("aggregation failed for domain in Pinot: %s", domainName) + } + var domainWorkflowVersionCount DomainWorkflowVersionCount + for _, row := range response { + workflowType := row[0].(string) + workflowCount, ok := row[1].(int) + if !ok { + logger.Error("Error parsing workflow count", + zap.Error(err), + zap.String("WorkflowType", workflowType), + zap.String("DomainName", domainName), + ) + return fmt.Errorf("error parsing workflow count for workflow type %s", workflowType) + } + workflowVersions, err := w.queryWorkflowVersionsWithType(domainName, workflowType, logger) + + if err != nil { + logger.Error("Error querying workflow versions", + zap.Error(err), + zap.String("WorkflowType", workflowType), + zap.String("DomainName", domainName), + ) + return fmt.Errorf("error querying workflow versions for workflow type: %s: error: %s", workflowType, err.Error()) + } + + domainWorkflowVersionCount.WorkflowTypes = append(domainWorkflowVersionCount.WorkflowTypes, WorkflowTypeCount{ + EsAggregateCount: EsAggregateCount{ + AggregateKey: workflowType, + AggregateCount: int64(workflowCount), + }, + WorkflowVersions: workflowVersions, + }) + } + + for _, workflowType := range domainWorkflowVersionCount.WorkflowTypes { + for _, workflowVersion := range workflowType.WorkflowVersions.WorkflowVersions { + w.analyzer.tallyScope.Tagged( + map[string]string{domainTag: domainName, workflowVersionTag: workflowVersion.AggregateKey, workflowTypeTag: workflowType.AggregateKey}, + ).Gauge(workflowVersionCountMetrics).Update(float64(workflowVersion.AggregateCount)) + } + } + return nil +} + +func (w *Workflow) queryWorkflowVersionsWithType(domainName string, wfType string, logger *zap.Logger) (WorkflowVersionCount, error) { + wfVersionPinotQuery, err := w.getWorkflowVersionPinotQuery(domainName, wfType) + if err != nil { + logger.Error("Failed to get Pinot query to find workflow version Info", + zap.Error(err), + zap.String("DomainName", domainName), + ) + return WorkflowVersionCount{}, err + } + + response, err := w.analyzer.pinotClient.SearchAggr(&pinot.SearchRequest{Query: wfVersionPinotQuery}) + if err != nil { + logger.Error("Failed to query Pinot to find workflow type count Info", + zap.Error(err), + zap.String("VisibilityQuery", wfVersionPinotQuery), + zap.String("DomainName", domainName), + ) + return WorkflowVersionCount{}, err + } + foundAggregation := len(response) > 0 + + // if no CadenceChangeVersion is found, return an empty WorkflowVersionCount, no errors + if !foundAggregation { + return WorkflowVersionCount{}, nil + } + + var workflowVersions WorkflowVersionCount + for _, row := range response { + workflowVersion := row[0].(string) + workflowCount, ok := row[1].(int) + if !ok { + logger.Error("Error parsing workflow count", + zap.Error(err), + zap.String("WorkflowVersion", workflowVersion), + zap.String("DomainName", domainName), + ) + return WorkflowVersionCount{}, fmt.Errorf("error parsing workflow count for workflow version %s", workflowVersion) + } + workflowVersions.WorkflowVersions = append(workflowVersions.WorkflowVersions, EsAggregateCount{ + AggregateKey: workflowVersion, + AggregateCount: int64(workflowCount), + }) + } + return workflowVersions, nil +} + func (w *Workflow) emitWorkflowVersionMetricsES(ctx context.Context, domainName string, logger *zap.Logger) error { wfVersionEsQuery, err := w.getWorkflowVersionQuery(domainName) if err != nil { @@ -198,7 +360,7 @@ func (w *Workflow) emitWorkflowVersionMetricsES(ctx context.Context, domainName zap.String("DomainName", domainName), zap.String("VisibilityQuery", wfVersionEsQuery), ) - return err + return fmt.Errorf("aggregation failed for domain in ES: %s", domainName) } var domainWorkflowVersionCount DomainWorkflowVersionCount err = json.Unmarshal(agg, &domainWorkflowVersionCount)