Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor everywhere uses ESClient to have a Switch #6168

Merged
merged 6 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions service/worker/esanalyzer/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,27 @@
es "github.com/uber/cadence/common/elasticsearch"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/pinot"
"github.com/uber/cadence/common/resource"
"github.com/uber/cadence/service/worker/workercommon"
)

type readMode string

const (
Pinot readMode = "pinot"
ES readMode = "es"
)

type (
// Analyzer is the background sub-system to query ElasticSearch and execute mitigations
Analyzer struct {
svcClient workflowserviceclient.Interface
frontendClient frontend.Client
clientBean client.Bean
esClient es.GenericClient
pinotClient pinot.GenericClient
readMode readMode
logger log.Logger
tallyScope tally.Scope
visibilityIndexName string
Expand Down Expand Up @@ -98,18 +108,28 @@
frontendClient frontend.Client,
clientBean client.Bean,
esClient es.GenericClient,
pinotClient pinot.GenericClient,
esConfig *config.ElasticSearchConfig,
logger log.Logger,
tallyScope tally.Scope,
resource resource.Resource,
domainCache cache.DomainCache,
config *Config,
) *Analyzer {
var mode readMode
if esClient != nil {
mode = ES
} else if pinotClient != nil {
mode = Pinot

Check warning on line 123 in service/worker/esanalyzer/analyzer.go

View check run for this annotation

Codecov / codecov/patch

service/worker/esanalyzer/analyzer.go#L119-L123

Added lines #L119 - L123 were not covered by tests
}
Comment on lines +119 to +124
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this prefers ES if both are present:
could this mode be replaced by "just use the non-nil client" and only pass one?

or is there more stuff coming that'll use both sometimes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it can be. It's just that Pinot and ES has different GenericClients, there'll need a lot efforts to do a refactor.

Copy link
Member

@Groxx Groxx Jul 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aaah, because esClient is used by other things too, and there is / will be shadowing, so these fields are not just for this mode.
alrighty, makes sense 👍


return &Analyzer{
svcClient: svcClient,
frontendClient: frontendClient,
clientBean: clientBean,
esClient: esClient,
pinotClient: pinotClient,
readMode: mode,

Check warning on line 132 in service/worker/esanalyzer/analyzer.go

View check run for this annotation

Codecov / codecov/patch

service/worker/esanalyzer/analyzer.go#L131-L132

Added lines #L131 - L132 were not covered by tests
logger: logger,
tallyScope: tallyScope,
visibilityIndexName: esConfig.Indices[common.VisibilityAppName],
Expand Down
93 changes: 53 additions & 40 deletions service/worker/esanalyzer/domainWorkflowTypeCountWorkflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,51 +146,64 @@
return err
}
for _, domainName := range workflowMetricDomainNames {
wfTypeCountEsQuery, err := w.getDomainWorkflowTypeCountQuery(domainName)
if err != nil {
logger.Error("Failed to get ElasticSearch query to find domain workflow type Info",
zap.Error(err),
zap.String("DomainName", domainName),
)
return err
switch w.analyzer.readMode {
case ES:
err = w.emitWorkflowTypeCountMetricsES(ctx, domainName, logger)

Check warning on line 151 in service/worker/esanalyzer/domainWorkflowTypeCountWorkflow.go

View check run for this annotation

Codecov / codecov/patch

service/worker/esanalyzer/domainWorkflowTypeCountWorkflow.go#L150-L151

Added lines #L150 - L151 were not covered by tests
default:
err = w.emitWorkflowTypeCountMetricsES(ctx, domainName, logger)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

both cases doing to same thing. did you mean to emit a metric for pinot here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I'll add pinot in the next PR

}
response, err := w.analyzer.esClient.SearchRaw(ctx, w.analyzer.visibilityIndexName, wfTypeCountEsQuery)
if err != nil {
logger.Error("Failed to query ElasticSearch to find workflow type count Info",
zap.Error(err),
zap.String("VisibilityQuery", wfTypeCountEsQuery),
zap.String("DomainName", domainName),
)
return err
}
agg, foundAggregation := response.Aggregations[workflowTypesAggKey]

if !foundAggregation {
logger.Error("ElasticSearch error: aggregation failed.",
zap.Error(err),
zap.String("Aggregation", string(agg)),
zap.String("DomainName", domainName),
zap.String("VisibilityQuery", wfTypeCountEsQuery),
)
return err
}
var domainWorkflowTypeCount DomainWorkflowTypeCount
err = json.Unmarshal(agg, &domainWorkflowTypeCount)
if err != nil {
logger.Error("ElasticSearch error parsing aggregation.",
zap.Error(err),
zap.String("Aggregation", string(agg)),
zap.String("DomainName", domainName),
zap.String("VisibilityQuery", wfTypeCountEsQuery),
)
return err
}
for _, workflowType := range domainWorkflowTypeCount.WorkflowTypes {
w.analyzer.tallyScope.Tagged(
map[string]string{domainTag: domainName, workflowTypeTag: workflowType.AggregateKey},
).Gauge(workflowTypeCountMetrics).Update(float64(workflowType.AggregateCount))
}
}
}
return nil
}

func (w *Workflow) emitWorkflowTypeCountMetricsES(ctx context.Context, domainName string, logger *zap.Logger) error {
wfTypeCountEsQuery, err := w.getDomainWorkflowTypeCountQuery(domainName)
if err != nil {
logger.Error("Failed to get ElasticSearch query to find domain workflow type Info",
zap.Error(err),
zap.String("DomainName", domainName),
)
return err

Check warning on line 170 in service/worker/esanalyzer/domainWorkflowTypeCountWorkflow.go

View check run for this annotation

Codecov / codecov/patch

service/worker/esanalyzer/domainWorkflowTypeCountWorkflow.go#L166-L170

Added lines #L166 - L170 were not covered by tests
}
response, err := w.analyzer.esClient.SearchRaw(ctx, w.analyzer.visibilityIndexName, wfTypeCountEsQuery)
if err != nil {
logger.Error("Failed to query ElasticSearch to find workflow type count Info",
zap.Error(err),
zap.String("VisibilityQuery", wfTypeCountEsQuery),
zap.String("DomainName", domainName),
)
return err

Check warning on line 179 in service/worker/esanalyzer/domainWorkflowTypeCountWorkflow.go

View check run for this annotation

Codecov / codecov/patch

service/worker/esanalyzer/domainWorkflowTypeCountWorkflow.go#L174-L179

Added lines #L174 - L179 were not covered by tests
}
agg, foundAggregation := response.Aggregations[workflowTypesAggKey]

if !foundAggregation {
logger.Error("ElasticSearch error: aggregation failed.",
zap.Error(err),
zap.String("Aggregation", string(agg)),
zap.String("DomainName", domainName),
zap.String("VisibilityQuery", wfTypeCountEsQuery),
)
return err

Check warning on line 190 in service/worker/esanalyzer/domainWorkflowTypeCountWorkflow.go

View check run for this annotation

Codecov / codecov/patch

service/worker/esanalyzer/domainWorkflowTypeCountWorkflow.go#L184-L190

Added lines #L184 - L190 were not covered by tests
}
var domainWorkflowTypeCount DomainWorkflowTypeCount
err = json.Unmarshal(agg, &domainWorkflowTypeCount)
if err != nil {
logger.Error("ElasticSearch error parsing aggregation.",
zap.Error(err),
zap.String("Aggregation", string(agg)),
zap.String("DomainName", domainName),
zap.String("VisibilityQuery", wfTypeCountEsQuery),
)
return err

Check warning on line 201 in service/worker/esanalyzer/domainWorkflowTypeCountWorkflow.go

View check run for this annotation

Codecov / codecov/patch

service/worker/esanalyzer/domainWorkflowTypeCountWorkflow.go#L195-L201

Added lines #L195 - L201 were not covered by tests
}
for _, workflowType := range domainWorkflowTypeCount.WorkflowTypes {
w.analyzer.tallyScope.Tagged(
map[string]string{domainTag: domainName, workflowTypeTag: workflowType.AggregateKey},
).Gauge(workflowTypeCountMetrics).Update(float64(workflowType.AggregateCount))
}
return nil
}
95 changes: 54 additions & 41 deletions service/worker/esanalyzer/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,52 +157,65 @@
return err
}
for _, domainName := range workflowMetricDomainNames {
wfVersionEsQuery, err := w.getWorkflowVersionQuery(domainName)
if err != nil {
logger.Error("Failed to get ElasticSearch query to find workflow version Info",
zap.Error(err),
zap.String("DomainName", domainName),
)
return err
switch w.analyzer.readMode {
case ES:
err = w.emitWorkflowVersionMetricsES(ctx, domainName, logger)

Check warning on line 162 in service/worker/esanalyzer/workflow.go

View check run for this annotation

Codecov / codecov/patch

service/worker/esanalyzer/workflow.go#L161-L162

Added lines #L161 - L162 were not covered by tests
default:
err = w.emitWorkflowVersionMetricsES(ctx, domainName, logger)
}
response, err := w.analyzer.esClient.SearchRaw(ctx, w.analyzer.visibilityIndexName, wfVersionEsQuery)
if err != nil {
logger.Error("Failed to query ElasticSearch to find workflow version Info",
zap.Error(err),
zap.String("VisibilityQuery", wfVersionEsQuery),
zap.String("DomainName", domainName),
)
return err
}
agg, foundAggregation := response.Aggregations[workflowTypesAggKey]
}
}
return nil
}

if !foundAggregation {
logger.Error("ElasticSearch error: aggregation failed.",
zap.Error(err),
zap.String("Aggregation", string(agg)),
zap.String("DomainName", domainName),
zap.String("VisibilityQuery", wfVersionEsQuery),
)
return err
}
var domainWorkflowVersionCount DomainWorkflowVersionCount
err = json.Unmarshal(agg, &domainWorkflowVersionCount)
if err != nil {
logger.Error("ElasticSearch error parsing aggregation.",
zap.Error(err),
zap.String("Aggregation", string(agg)),
zap.String("DomainName", domainName),
zap.String("VisibilityQuery", wfVersionEsQuery),
)
return err
}
for _, workflowType := range domainWorkflowVersionCount.WorkflowTypes {
for _, workflowVersion := range workflowType.WorkflowVersions.WorkflowVersions {
w.analyzer.tallyScope.Tagged(
map[string]string{domainTag: domainName, workflowVersionTag: workflowVersion.AggregateKey, workflowTypeTag: workflowType.AggregateKey},
).Gauge(workflowVersionCountMetrics).Update(float64(workflowVersion.AggregateCount))
}
}
func (w *Workflow) emitWorkflowVersionMetricsES(ctx context.Context, domainName string, logger *zap.Logger) error {
wfVersionEsQuery, err := w.getWorkflowVersionQuery(domainName)
if err != nil {
logger.Error("Failed to get ElasticSearch query to find workflow version Info",
zap.Error(err),
zap.String("DomainName", domainName),
)
return err

Check warning on line 181 in service/worker/esanalyzer/workflow.go

View check run for this annotation

Codecov / codecov/patch

service/worker/esanalyzer/workflow.go#L177-L181

Added lines #L177 - L181 were not covered by tests
}
response, err := w.analyzer.esClient.SearchRaw(ctx, w.analyzer.visibilityIndexName, wfVersionEsQuery)
if err != nil {
logger.Error("Failed to query ElasticSearch to find workflow version Info",
zap.Error(err),
zap.String("VisibilityQuery", wfVersionEsQuery),
zap.String("DomainName", domainName),
)
return err

Check warning on line 190 in service/worker/esanalyzer/workflow.go

View check run for this annotation

Codecov / codecov/patch

service/worker/esanalyzer/workflow.go#L185-L190

Added lines #L185 - L190 were not covered by tests
}
agg, foundAggregation := response.Aggregations[workflowTypesAggKey]

if !foundAggregation {
logger.Error("ElasticSearch error: aggregation failed.",
zap.Error(err),
zap.String("Aggregation", string(agg)),
zap.String("DomainName", domainName),
zap.String("VisibilityQuery", wfVersionEsQuery),
)
return err

Check warning on line 201 in service/worker/esanalyzer/workflow.go

View check run for this annotation

Codecov / codecov/patch

service/worker/esanalyzer/workflow.go#L195-L201

Added lines #L195 - L201 were not covered by tests
}
var domainWorkflowVersionCount DomainWorkflowVersionCount
err = json.Unmarshal(agg, &domainWorkflowVersionCount)
if err != nil {
logger.Error("ElasticSearch error parsing aggregation.",
zap.Error(err),
zap.String("Aggregation", string(agg)),
zap.String("DomainName", domainName),
zap.String("VisibilityQuery", wfVersionEsQuery),
)
return err

Check warning on line 212 in service/worker/esanalyzer/workflow.go

View check run for this annotation

Codecov / codecov/patch

service/worker/esanalyzer/workflow.go#L206-L212

Added lines #L206 - L212 were not covered by tests
}
for _, workflowType := range domainWorkflowVersionCount.WorkflowTypes {
for _, workflowVersion := range workflowType.WorkflowVersions.WorkflowVersions {
w.analyzer.tallyScope.Tagged(
map[string]string{domainTag: domainName, workflowVersionTag: workflowVersion.AggregateKey, workflowTypeTag: workflowType.AggregateKey},
).Gauge(workflowVersionCountMetrics).Update(float64(workflowVersion.AggregateCount))
}
}
return nil
Expand Down
1 change: 1 addition & 0 deletions service/worker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ func (s *Service) startESAnalyzer() {
s.GetFrontendClient(),
s.GetClientBean(),
s.params.ESClient,
s.params.PinotClient,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it's fine for this PR but as a general pattern we should avoid passing multiple client objects to a component and having it decide based on which one is nil. Ideally there would be one parameter here of visibilityClient type or something like that and the bootstrap code (this file) determines which one to pass there.

s.params.ESConfig,
s.GetLogger(),
s.params.MetricScope,
Expand Down
Loading