Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add emitWorkflowVersionMetrics for pinot #6190

Merged
merged 5 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 149 additions & 3 deletions service/worker/esanalyzer/analyzer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ func TestEmitWorkflowTypeCountMetricsESErrorCases(t *testing.T) {
Aggregations: map[string]json.RawMessage{},
}, nil).Times(1)
},
expectedErr: nil,
expectedErr: fmt.Errorf("aggregation failed for domain in ES: test-domain"),
},
"Case4: error unmarshalling aggregation": {
domainCacheAffordance: func(mockDomainCache *cache.MockDomainCache) {
Expand Down Expand Up @@ -501,7 +501,7 @@ func TestEmitWorkflowVersionMetricsESErrorCases(t *testing.T) {
Aggregations: map[string]json.RawMessage{},
}, nil).Times(1)
},
expectedErr: nil,
expectedErr: fmt.Errorf("aggregation failed for domain in ES: test-domain"),
},
"Case4: error unmarshalling aggregation": {
domainCacheAffordance: func(mockDomainCache *cache.MockDomainCache) {
Expand Down Expand Up @@ -596,7 +596,7 @@ func TestEmitWorkflowTypeCountMetricsPinot(t *testing.T) {
PinotClientAffordance: func(mockPinotClient *pinot.MockGenericClient) {
mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return([][]interface{}{}, nil).Times(1)
},
expectedErr: nil,
expectedErr: fmt.Errorf("aggregation failed for domain in Pinot: test-domain"),
},
"Case4: error parsing workflow count": {
domainCacheAffordance: func(mockDomainCache *cache.MockDomainCache) {
Expand Down Expand Up @@ -627,3 +627,149 @@ func TestEmitWorkflowTypeCountMetricsPinot(t *testing.T) {
})
}
}

func TestEmitWorkflowVersionMetricsPinot(t *testing.T) {
mockPinotConfig := &config.PinotVisibilityConfig{
Table: "test",
}
mockESConfig := &config.ElasticSearchConfig{
Indices: map[string]string{
common.VisibilityAppName: "test",
},
}

ctrl := gomock.NewController(t)

mockPinotClient := pinot.NewMockGenericClient(ctrl)
mockDomainCache := cache.NewMockDomainCache(ctrl)
testAnalyzer := New(nil, nil, nil, nil, mockPinotClient, mockESConfig, mockPinotConfig, log.NewNoop(), tally.NoopScope, nil, mockDomainCache, nil)
testWorkflow := &Workflow{analyzer: testAnalyzer}

tests := map[string]struct {
domainCacheAffordance func(mockDomainCache *cache.MockDomainCache)
PinotClientAffordance func(mockPinotClient *pinot.MockGenericClient)
expectedErr error
}{
"Case0: success": {
domainCacheAffordance: func(mockDomainCache *cache.MockDomainCache) {
mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(cache.NewDomainCacheEntryForTest(
&persistence.DomainInfo{ID: "test-id"}, nil, false, nil, 0, nil), nil).Times(3)
},
PinotClientAffordance: func(mockPinotClient *pinot.MockGenericClient) {
mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return([][]interface{}{
{"test-wf-type0", 100},
{"test-wf-type1", 200},
}, nil).Times(1)
mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return([][]interface{}{
{"test-wf-version0", 1},
{"test-wf-version1", 20},
}, nil).Times(1)
mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return([][]interface{}{
{"test-wf-version3", 10},
{"test-wf-version4", 2},
}, nil).Times(1)
},
expectedErr: nil,
},
"Case1: error getting domain": {
domainCacheAffordance: func(mockDomainCache *cache.MockDomainCache) {
mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(nil, fmt.Errorf("domain error")).Times(1)
},
PinotClientAffordance: func(mockPinotClient *pinot.MockGenericClient) {},
expectedErr: fmt.Errorf("domain error"),
},
"Case2: error Pinot query": {
domainCacheAffordance: func(mockDomainCache *cache.MockDomainCache) {
mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(cache.NewDomainCacheEntryForTest(
&persistence.DomainInfo{ID: "test-id"}, nil, false, nil, 0, nil), nil)
},
PinotClientAffordance: func(mockPinotClient *pinot.MockGenericClient) {
mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return(nil, fmt.Errorf("pinot error")).Times(1)
},
expectedErr: fmt.Errorf("failed to query Pinot to find workflow type count Info: test-domain, error: pinot error"),
},
"Case3: Aggregation is empty": {
domainCacheAffordance: func(mockDomainCache *cache.MockDomainCache) {
mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(cache.NewDomainCacheEntryForTest(
&persistence.DomainInfo{ID: "test-id"}, nil, false, nil, 0, nil), nil)
},
PinotClientAffordance: func(mockPinotClient *pinot.MockGenericClient) {
mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return([][]interface{}{}, nil).Times(1)
},
expectedErr: fmt.Errorf("aggregation failed for domain in Pinot: test-domain"),
},
"Case4: error parsing workflow count": {
domainCacheAffordance: func(mockDomainCache *cache.MockDomainCache) {
mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(cache.NewDomainCacheEntryForTest(
&persistence.DomainInfo{ID: "test-id"}, nil, false, nil, 0, nil), nil)
},
PinotClientAffordance: func(mockPinotClient *pinot.MockGenericClient) {
mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return([][]interface{}{
{"test", "invalid"},
}, nil).Times(1)
},
expectedErr: fmt.Errorf("error parsing workflow count for workflow type test"),
},
"Case5-1: failure case in queryWorkflowVersionsWithType: getWorkflowVersionPinotQuery error": {
domainCacheAffordance: func(mockDomainCache *cache.MockDomainCache) {
mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(cache.NewDomainCacheEntryForTest(
&persistence.DomainInfo{ID: "test-id"}, nil, false, nil, 0, nil), nil).Times(1)
mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(nil, fmt.Errorf("domain error")).Times(1)
},
PinotClientAffordance: func(mockPinotClient *pinot.MockGenericClient) {
mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return([][]interface{}{
{"test-wf-type0", 100},
{"test-wf-type1", 200},
}, nil).Times(1)
},
expectedErr: fmt.Errorf("error querying workflow versions for workflow type: test-wf-type0: error: domain error"),
},
"Case5-2: failure case in queryWorkflowVersionsWithType: SearchAggr error": {
domainCacheAffordance: func(mockDomainCache *cache.MockDomainCache) {
mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(cache.NewDomainCacheEntryForTest(
&persistence.DomainInfo{ID: "test-id"}, nil, false, nil, 0, nil), nil).Times(2)
},
PinotClientAffordance: func(mockPinotClient *pinot.MockGenericClient) {
mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return([][]interface{}{
{"test-wf-type0", 100},
{"test-wf-type1", 200},
}, nil).Times(1)
mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return(nil, fmt.Errorf("pinot error")).Times(1)
},
expectedErr: fmt.Errorf("error querying workflow versions for workflow type: test-wf-type0: error: pinot error"),
},
"Case5-3: failure case in queryWorkflowVersionsWithType: error parsing workflow count": {
domainCacheAffordance: func(mockDomainCache *cache.MockDomainCache) {
mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(cache.NewDomainCacheEntryForTest(
&persistence.DomainInfo{ID: "test-id"}, nil, false, nil, 0, nil), nil).Times(2)
},
PinotClientAffordance: func(mockPinotClient *pinot.MockGenericClient) {
mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return([][]interface{}{
{"test-wf-type0", 100},
{"test-wf-type1", 200},
}, nil).Times(1)
mockPinotClient.EXPECT().SearchAggr(gomock.Any()).Return([][]interface{}{
{"test-wf-version0", 1.5},
{"test-wf-version1", 20},
}, nil).Times(1)
},
expectedErr: fmt.Errorf("error querying workflow versions for workflow type: " +
"test-wf-type0: error: error parsing workflow count for workflow version test-wf-version0"),
},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
// Set up mocks
test.domainCacheAffordance(mockDomainCache)
test.PinotClientAffordance(mockPinotClient)

err := testWorkflow.emitWorkflowVersionMetricsPinot("test-domain", zap.NewNop())
if err == nil {
assert.Equal(t, test.expectedErr, err)
} else {
assert.Equal(t, test.expectedErr.Error(), err.Error())
}
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (w *Workflow) emitWorkflowTypeCountMetricsPinot(domainName string, logger *
zap.String("DomainName", domainName),
zap.String("VisibilityQuery", wfTypeCountPinotQuery),
)
return err
return fmt.Errorf("aggregation failed for domain in Pinot: %s", domainName)
}
var domainWorkflowTypeCount DomainWorkflowTypeCount
for _, row := range response {
Expand Down Expand Up @@ -267,7 +267,7 @@ func (w *Workflow) emitWorkflowTypeCountMetricsES(ctx context.Context, domainNam
zap.String("DomainName", domainName),
zap.String("VisibilityQuery", wfTypeCountEsQuery),
)
return err
return fmt.Errorf("aggregation failed for domain in ES: %s", domainName)
}
var domainWorkflowTypeCount DomainWorkflowTypeCount
err = json.Unmarshal(agg, &domainWorkflowTypeCount)
Expand Down
164 changes: 163 additions & 1 deletion service/worker/esanalyzer/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
cclient "go.uber.org/cadence/client"
"go.uber.org/cadence/workflow"
"go.uber.org/zap"

"github.com/uber/cadence/common/pinot"
)

const (
Expand Down Expand Up @@ -146,6 +148,47 @@
`, domain.GetInfo().ID), nil
}

func (w *Workflow) getWorkflowTypePinotQuery(domainName string) (string, error) {
domain, err := w.analyzer.domainCache.GetDomain(domainName)
if err != nil {
return "", err
}
// exclude uninitialized workflow executions by checking whether record has start time field
// there's a "LIMIT 10" because in ES, Aggr clause by default returns the top 10 results
return fmt.Sprintf(`
SELECT WorkflowType, COUNT(*) AS count
FROM %s
WHERE DomainID = '%s'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: %q to replace '%s' according to https://pkg.go.dev/fmt

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For strings, %q returns a double-quoted string safely escaped with Go syntax, but in Pinot, Where DomainID = "" doesn't work. It has to be single quoted.

AND CloseStatus = -1
AND StartTime > 0
GROUP BY WorkflowType
ORDER BY count DESC
LIMIT 10
OFFSET 0
`, w.analyzer.pinotTableName, domain.GetInfo().ID), nil
}

func (w *Workflow) getWorkflowVersionPinotQuery(domainName string, wfType string) (string, error) {
domain, err := w.analyzer.domainCache.GetDomain(domainName)
if err != nil {
return "", err
}
// exclude uninitialized workflow executions by checking whether record has start time field
// there's a "LIMIT 10" because in ES, Aggr clause by default returns the top 10 results
return fmt.Sprintf(`
SELECT JSON_EXTRACT_SCALAR(Attr, '$.CadenceChangeVersion', 'STRING_ARRAY') AS CadenceChangeVersion, COUNT(*) AS count
FROM %s
WHERE DomainID = '%s'
AND CloseStatus = -1
AND StartTime > 0
AND WorkflowType = '%s'
GROUP BY JSON_EXTRACT_SCALAR(Attr, '$.CadenceChangeVersion', 'STRING_ARRAY') AS CadenceChangeVersion
ORDER BY count DESC
LIMIT 10
OFFSET 0
`, w.analyzer.pinotTableName, domain.GetInfo().ID, wfType), nil
}

// emitWorkflowVersionMetrics is an activity that emits the running WF versions of a domain
func (w *Workflow) emitWorkflowVersionMetrics(ctx context.Context) error {
logger := activity.GetLogger(ctx)
Expand All @@ -160,6 +203,8 @@
switch w.analyzer.readMode {
case ES:
err = w.emitWorkflowVersionMetricsES(ctx, domainName, logger)
case Pinot:
err = w.emitWorkflowVersionMetricsPinot(domainName, logger)

Check warning on line 207 in service/worker/esanalyzer/workflow.go

View check run for this annotation

Codecov / codecov/patch

service/worker/esanalyzer/workflow.go#L206-L207

Added lines #L206 - L207 were not covered by tests
default:
err = w.emitWorkflowVersionMetricsES(ctx, domainName, logger)
}
Expand All @@ -171,6 +216,123 @@
return nil
}

func (w *Workflow) emitWorkflowVersionMetricsPinot(domainName string, logger *zap.Logger) error {
wfVersionPinotQuery, err := w.getWorkflowTypePinotQuery(domainName)
if err != nil {
logger.Error("Failed to get Pinot query to find workflow type Info",
zap.Error(err),
zap.String("DomainName", domainName),
)
return err
}
response, err := w.analyzer.pinotClient.SearchAggr(&pinot.SearchRequest{Query: wfVersionPinotQuery})
if err != nil {
logger.Error("Failed to query Pinot to find workflow type count Info",
zap.Error(err),
zap.String("VisibilityQuery", wfVersionPinotQuery),
zap.String("DomainName", domainName),
)
return fmt.Errorf("failed to query Pinot to find workflow type count Info: %s, error: %s", domainName, err.Error())
}
foundAggregation := len(response) > 0

if !foundAggregation {
logger.Error("Pinot error: aggregation failed.",
zap.Error(err),
zap.String("Aggregation", fmt.Sprintf("%v", response)),
zap.String("DomainName", domainName),
zap.String("VisibilityQuery", wfVersionPinotQuery),
)
return fmt.Errorf("aggregation failed for domain in Pinot: %s", domainName)
}
var domainWorkflowVersionCount DomainWorkflowVersionCount
for _, row := range response {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

10x latency might be an issue for metrics emission. Could you parallelize it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do this in parallel with multiple threads, is there a risk when metrics are emitted, the workflow still doesn't have all the data?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This metrics doesn't care about the latency, since we run it every 5 or 10 minutes. But we can eliminate the calls when we aggregate by both version and type

Copy link
Member Author

@bowenxia bowenxia Jul 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed this with Ender offline as well. We are going to keep this approach.

workflowType := row[0].(string)
workflowCount, ok := row[1].(int)
if !ok {
logger.Error("Error parsing workflow count",
zap.Error(err),
zap.String("WorkflowType", workflowType),
zap.String("DomainName", domainName),
)
return fmt.Errorf("error parsing workflow count for workflow type %s", workflowType)
}
workflowVersions, err := w.queryWorkflowVersionsWithType(domainName, workflowType, logger)

if err != nil {
logger.Error("Error querying workflow versions",
zap.Error(err),
zap.String("WorkflowType", workflowType),
zap.String("DomainName", domainName),
)
return fmt.Errorf("error querying workflow versions for workflow type: %s: error: %s", workflowType, err.Error())
}

domainWorkflowVersionCount.WorkflowTypes = append(domainWorkflowVersionCount.WorkflowTypes, WorkflowTypeCount{
EsAggregateCount: EsAggregateCount{
AggregateKey: workflowType,
AggregateCount: int64(workflowCount),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

workflowCount is from first call; this will be different from the summation of counts from subsequent calls by workflowtypes. But you could instead use the summation to be at least self consistent.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's one sample result from ES:

{ "key": "UpfrontChargeWorkflow::start", "doc_count": 182, "versions": { "doc_count_error_upper_bound": 0, "sum_other_doc_count": 0, "buckets": [ { "key": "waitForPSPCallback-1", "doc_count": 149 } ] } },

The count of workflow type is different from the summation of the counts of CadenceChangeVersions. I was thinking if this is designed on purpose.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we group by WorkflowType and CadenceChangeVersion, so it can have the count per version and per type. I tried and it is working

SELECT JSON_EXTRACT_SCALAR(Attr, '$.CadenceChangeVersion', 'STRING_ARRAY') AS CadenceChangeVersion, COUNT(*) AS count, workflowtype
FROM rta.rta.cadence_visibility_production
WHERE IsDeleted = false
  AND CloseStatus = -1
  AND StartTime > 0
  AND JSON_EXTRACT_SCALAR(Attr, '$.CadenceChangeVersion', 'STRING_ARRAY') IS NOT NULL
GROUP BY JSON_EXTRACT_SCALAR(Attr, '$.CadenceChangeVersion', 'STRING_ARRAY'), workflowtype
ORDER BY count DESC

Copy link
Member Author

@bowenxia bowenxia Jul 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That query means to count all the workflowTypes which has CadenceChangeVersion. This is different from the ES result. For that ES query, it means to first, find the top 10 workflow types by count, and then, within these 10 workflow types, identify the top 10 CadenceChangeVersions count for each.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline, group by version and type will filter the records without CadenceChangeVersion. Need to verify if we need to emit that count, if not we can go with this approach.

},
WorkflowVersions: workflowVersions,
})
}

for _, workflowType := range domainWorkflowVersionCount.WorkflowTypes {
for _, workflowVersion := range workflowType.WorkflowVersions.WorkflowVersions {
w.analyzer.tallyScope.Tagged(
map[string]string{domainTag: domainName, workflowVersionTag: workflowVersion.AggregateKey, workflowTypeTag: workflowType.AggregateKey},
).Gauge(workflowVersionCountMetrics).Update(float64(workflowVersion.AggregateCount))
}
}
return nil
}

func (w *Workflow) queryWorkflowVersionsWithType(domainName string, wfType string, logger *zap.Logger) (WorkflowVersionCount, error) {
wfVersionPinotQuery, err := w.getWorkflowVersionPinotQuery(domainName, wfType)
if err != nil {
logger.Error("Failed to get Pinot query to find workflow version Info",
zap.Error(err),
zap.String("DomainName", domainName),
)
return WorkflowVersionCount{}, err
}

response, err := w.analyzer.pinotClient.SearchAggr(&pinot.SearchRequest{Query: wfVersionPinotQuery})
if err != nil {
logger.Error("Failed to query Pinot to find workflow type count Info",
zap.Error(err),
zap.String("VisibilityQuery", wfVersionPinotQuery),
zap.String("DomainName", domainName),
)
return WorkflowVersionCount{}, err
}
foundAggregation := len(response) > 0

// if no CadenceChangeVersion is found, return an empty WorkflowVersionCount, no errors
if !foundAggregation {
return WorkflowVersionCount{}, nil

Check warning on line 313 in service/worker/esanalyzer/workflow.go

View check run for this annotation

Codecov / codecov/patch

service/worker/esanalyzer/workflow.go#L313

Added line #L313 was not covered by tests
}

var workflowVersions WorkflowVersionCount
for _, row := range response {
workflowVersion := row[0].(string)
workflowCount, ok := row[1].(int)
if !ok {
logger.Error("Error parsing workflow count",
zap.Error(err),
zap.String("WorkflowVersion", workflowVersion),
zap.String("DomainName", domainName),
)
return WorkflowVersionCount{}, fmt.Errorf("error parsing workflow count for workflow version %s", workflowVersion)
}
workflowVersions.WorkflowVersions = append(workflowVersions.WorkflowVersions, EsAggregateCount{
AggregateKey: workflowVersion,
AggregateCount: int64(workflowCount),
})
}
return workflowVersions, nil
}

func (w *Workflow) emitWorkflowVersionMetricsES(ctx context.Context, domainName string, logger *zap.Logger) error {
wfVersionEsQuery, err := w.getWorkflowVersionQuery(domainName)
if err != nil {
Expand Down Expand Up @@ -198,7 +360,7 @@
zap.String("DomainName", domainName),
zap.String("VisibilityQuery", wfVersionEsQuery),
)
return err
return fmt.Errorf("aggregation failed for domain in ES: %s", domainName)
}
var domainWorkflowVersionCount DomainWorkflowVersionCount
err = json.Unmarshal(agg, &domainWorkflowVersionCount)
Expand Down
Loading