Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use sdk client instead of rpc client #2192

Merged
merged 1 commit into from
Jul 10, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 16 additions & 75 deletions service/worker/batcher/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,13 @@ import (
"time"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
"go.uber.org/cadence"
"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
"go.uber.org/cadence/.gen/go/shared"
"go.uber.org/cadence/activity"
cclient "go.uber.org/cadence/client"
"go.uber.org/cadence/workflow"
"golang.org/x/time/rate"
)
Expand Down Expand Up @@ -194,6 +193,7 @@ func setDefaultParams(params BatchParams) BatchParams {
// BatchActivity is activity for processing batch operation
func BatchActivity(ctx context.Context, batchParams BatchParams) (HeartBeatDetails, error) {
batcher := ctx.Value(batcherContextKey).(*Batcher)
client := cclient.NewClient(batcher.svcClient, batchParams.DomainName, &cclient.Options{})

hbd := HeartBeatDetails{}
startOver := true
Expand All @@ -208,7 +208,9 @@ func BatchActivity(ctx context.Context, batchParams BatchParams) (HeartBeatDetai
}

if startOver {
resp, err := countWorkflowsWithRetry(ctx, batcher.svcClient, batchParams)
resp, err := client.CountWorkflow(ctx, &shared.CountWorkflowExecutionsRequest{
Query: common.StringPtr(batchParams.Query),
})
if err != nil {
return HeartBeatDetails{}, err
}
Expand All @@ -218,14 +220,18 @@ func BatchActivity(ctx context.Context, batchParams BatchParams) (HeartBeatDetai
taskCh := make(chan taskDetail, pageSize)
respCh := make(chan error, pageSize)
for i := 0; i < batchParams.Concurrency; i++ {
go startTaskProcessor(ctx, batchParams, taskCh, respCh, rateLimiter)
go startTaskProcessor(ctx, batchParams, taskCh, respCh, rateLimiter, client)
}

for {
// TODO https://github.com/uber/cadence/issues/2154
// Need to improve scan concurrency because it will hold an ES resource until the workflow finishes.
// And we can't use list API because terminate / reset will mutate the result.
resp, err := scanWorkflowsWithRetry(ctx, batcher.svcClient, hbd.PageToken, batchParams)
resp, err := client.ScanWorkflow(ctx, &shared.ListWorkflowExecutionsRequest{
PageSize: common.Int32Ptr(int32(pageSize)),
NextPageToken: hbd.PageToken,
Query: common.StringPtr(batchParams.Query),
})
if err != nil {
return HeartBeatDetails{}, err
}
Expand Down Expand Up @@ -277,65 +283,13 @@ func BatchActivity(ctx context.Context, batchParams BatchParams) (HeartBeatDetai
return hbd, nil
}

func countWorkflowsWithRetry(ctx context.Context, svcClient workflowserviceclient.Interface, batchParams BatchParams) (*shared.CountWorkflowExecutionsResponse, error) {
batcher := ctx.Value(batcherContextKey).(*Batcher)
var resp *shared.CountWorkflowExecutionsResponse
policy := backoff.NewExponentialRetryPolicy(rpcTimeout)
policy.SetMaximumInterval(batchParams.ActivityHeartBeatTimeout)
policy.SetExpirationInterval(batchParams.ActivityHeartBeatTimeout)

err := backoff.Retry(func() error {
var err error
resp, err = svcClient.CountWorkflowExecutions(ctx, &shared.CountWorkflowExecutionsRequest{
Domain: common.StringPtr(batchParams.DomainName),
Query: common.StringPtr(batchParams.Query),
})
if err != nil {
batcher.metricsClient.IncCounter(metrics.BatcherScope, metrics.BatcherProcessorFailures)
getActivityLogger(ctx).Error("Failed to countWorkflowsWithRetry for batch operation task", tag.Error(err))
}

return err
}, policy, func(err error) bool {
return true
})

return resp, err
}

func scanWorkflowsWithRetry(ctx context.Context, svcClient workflowserviceclient.Interface, pageToken []byte, batchParams BatchParams) (*shared.ListWorkflowExecutionsResponse, error) {
batcher := ctx.Value(batcherContextKey).(*Batcher)
var resp *shared.ListWorkflowExecutionsResponse
policy := backoff.NewExponentialRetryPolicy(rpcTimeout)
policy.SetMaximumInterval(batchParams.ActivityHeartBeatTimeout)
policy.SetExpirationInterval(batchParams.ActivityHeartBeatTimeout)
err := backoff.Retry(func() error {
var err error
resp, err = svcClient.ScanWorkflowExecutions(ctx, &shared.ListWorkflowExecutionsRequest{
PageSize: common.Int32Ptr(int32(pageSize)),
Domain: common.StringPtr(batchParams.DomainName),
NextPageToken: pageToken,
Query: common.StringPtr(batchParams.Query),
})
if err != nil {
batcher.metricsClient.IncCounter(metrics.BatcherScope, metrics.BatcherProcessorFailures)
getActivityLogger(ctx).Error("Failed to scanWorkflowsWithRetry for batch operation task", tag.Error(err))
}

return err
}, policy, func(err error) bool {
return true
})

return resp, err
}

func startTaskProcessor(
ctx context.Context,
batchParams BatchParams,
taskCh chan taskDetail,
respCh chan error,
limiter *rate.Limiter,
client cclient.Client,
) {
batcher := ctx.Value(batcherContextKey).(*Batcher)
for {
Expand All @@ -350,7 +304,7 @@ func startTaskProcessor(

switch batchParams.BatchType {
case BatchTypeTerminate:
err = processTerminateTask(ctx, limiter, task, batchParams)
err = processTerminateTask(ctx, limiter, task, batchParams, client)
}
if err != nil {
batcher.metricsClient.IncCounter(metrics.BatcherScope, metrics.BatcherProcessorFailures)
Expand All @@ -372,8 +326,7 @@ func startTaskProcessor(
}
}

func processTerminateTask(ctx context.Context, limiter *rate.Limiter, task taskDetail, batchParams BatchParams) error {
batcher := ctx.Value(batcherContextKey).(*Batcher)
func processTerminateTask(ctx context.Context, limiter *rate.Limiter, task taskDetail, batchParams BatchParams, client cclient.Client) error {

wfs := []shared.WorkflowExecution{task.execution}
for len(wfs) > 0 {
Expand All @@ -384,14 +337,7 @@ func processTerminateTask(ctx context.Context, limiter *rate.Limiter, task taskD
return err
}

newCtx, cancel := context.WithTimeout(ctx, rpcTimeout)
err = batcher.svcClient.TerminateWorkflowExecution(newCtx, &shared.TerminateWorkflowExecutionRequest{
Domain: common.StringPtr(batchParams.DomainName),
WorkflowExecution: &wf,
Reason: common.StringPtr(batchParams.Reason),
Identity: common.StringPtr(batchWFTypeName),
})
cancel()
err = client.TerminateWorkflow(ctx, wf.GetWorkflowId(), wf.GetRunId(), batchParams.Reason, []byte{})
if err != nil {
// EntityNotExistsError means wf is not running or deleted
_, ok := err.(*shared.EntityNotExistsError)
Expand All @@ -400,12 +346,7 @@ func processTerminateTask(ctx context.Context, limiter *rate.Limiter, task taskD
}
}
wfs = wfs[1:]
newCtx, cancel = context.WithTimeout(ctx, rpcTimeout)
resp, err := batcher.svcClient.DescribeWorkflowExecution(newCtx, &shared.DescribeWorkflowExecutionRequest{
Domain: common.StringPtr(batchParams.DomainName),
Execution: &wf,
})
cancel()
resp, err := client.DescribeWorkflowExecution(ctx, wf.GetWorkflowId(), wf.GetRunId())
if err != nil {
// EntityNotExistsError means wf is deleted
_, ok := err.(*shared.EntityNotExistsError)
Expand Down