Skip to content

Commit

Permalink
Use sdk client instead of rpc client (#2192)
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng authored and yux0 committed Jul 10, 2019
1 parent 23e2ec9 commit 68b4c7c
Showing 1 changed file with 16 additions and 75 deletions.
91 changes: 16 additions & 75 deletions service/worker/batcher/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,13 @@ import (
"time"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
"go.uber.org/cadence"
"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
"go.uber.org/cadence/.gen/go/shared"
"go.uber.org/cadence/activity"
cclient "go.uber.org/cadence/client"
"go.uber.org/cadence/workflow"
"golang.org/x/time/rate"
)
Expand Down Expand Up @@ -194,6 +193,7 @@ func setDefaultParams(params BatchParams) BatchParams {
// BatchActivity is activity for processing batch operation
func BatchActivity(ctx context.Context, batchParams BatchParams) (HeartBeatDetails, error) {
batcher := ctx.Value(batcherContextKey).(*Batcher)
client := cclient.NewClient(batcher.svcClient, batchParams.DomainName, &cclient.Options{})

hbd := HeartBeatDetails{}
startOver := true
Expand All @@ -208,7 +208,9 @@ func BatchActivity(ctx context.Context, batchParams BatchParams) (HeartBeatDetai
}

if startOver {
resp, err := countWorkflowsWithRetry(ctx, batcher.svcClient, batchParams)
resp, err := client.CountWorkflow(ctx, &shared.CountWorkflowExecutionsRequest{
Query: common.StringPtr(batchParams.Query),
})
if err != nil {
return HeartBeatDetails{}, err
}
Expand All @@ -218,14 +220,18 @@ func BatchActivity(ctx context.Context, batchParams BatchParams) (HeartBeatDetai
taskCh := make(chan taskDetail, pageSize)
respCh := make(chan error, pageSize)
for i := 0; i < batchParams.Concurrency; i++ {
go startTaskProcessor(ctx, batchParams, taskCh, respCh, rateLimiter)
go startTaskProcessor(ctx, batchParams, taskCh, respCh, rateLimiter, client)
}

for {
// TODO https://github.com/uber/cadence/issues/2154
// Need to improve scan concurrency because it will hold an ES resource until the workflow finishes.
// And we can't use list API because terminate / reset will mutate the result.
resp, err := scanWorkflowsWithRetry(ctx, batcher.svcClient, hbd.PageToken, batchParams)
resp, err := client.ScanWorkflow(ctx, &shared.ListWorkflowExecutionsRequest{
PageSize: common.Int32Ptr(int32(pageSize)),
NextPageToken: hbd.PageToken,
Query: common.StringPtr(batchParams.Query),
})
if err != nil {
return HeartBeatDetails{}, err
}
Expand Down Expand Up @@ -277,65 +283,13 @@ func BatchActivity(ctx context.Context, batchParams BatchParams) (HeartBeatDetai
return hbd, nil
}

func countWorkflowsWithRetry(ctx context.Context, svcClient workflowserviceclient.Interface, batchParams BatchParams) (*shared.CountWorkflowExecutionsResponse, error) {
batcher := ctx.Value(batcherContextKey).(*Batcher)
var resp *shared.CountWorkflowExecutionsResponse
policy := backoff.NewExponentialRetryPolicy(rpcTimeout)
policy.SetMaximumInterval(batchParams.ActivityHeartBeatTimeout)
policy.SetExpirationInterval(batchParams.ActivityHeartBeatTimeout)

err := backoff.Retry(func() error {
var err error
resp, err = svcClient.CountWorkflowExecutions(ctx, &shared.CountWorkflowExecutionsRequest{
Domain: common.StringPtr(batchParams.DomainName),
Query: common.StringPtr(batchParams.Query),
})
if err != nil {
batcher.metricsClient.IncCounter(metrics.BatcherScope, metrics.BatcherProcessorFailures)
getActivityLogger(ctx).Error("Failed to countWorkflowsWithRetry for batch operation task", tag.Error(err))
}

return err
}, policy, func(err error) bool {
return true
})

return resp, err
}

func scanWorkflowsWithRetry(ctx context.Context, svcClient workflowserviceclient.Interface, pageToken []byte, batchParams BatchParams) (*shared.ListWorkflowExecutionsResponse, error) {
batcher := ctx.Value(batcherContextKey).(*Batcher)
var resp *shared.ListWorkflowExecutionsResponse
policy := backoff.NewExponentialRetryPolicy(rpcTimeout)
policy.SetMaximumInterval(batchParams.ActivityHeartBeatTimeout)
policy.SetExpirationInterval(batchParams.ActivityHeartBeatTimeout)
err := backoff.Retry(func() error {
var err error
resp, err = svcClient.ScanWorkflowExecutions(ctx, &shared.ListWorkflowExecutionsRequest{
PageSize: common.Int32Ptr(int32(pageSize)),
Domain: common.StringPtr(batchParams.DomainName),
NextPageToken: pageToken,
Query: common.StringPtr(batchParams.Query),
})
if err != nil {
batcher.metricsClient.IncCounter(metrics.BatcherScope, metrics.BatcherProcessorFailures)
getActivityLogger(ctx).Error("Failed to scanWorkflowsWithRetry for batch operation task", tag.Error(err))
}

return err
}, policy, func(err error) bool {
return true
})

return resp, err
}

func startTaskProcessor(
ctx context.Context,
batchParams BatchParams,
taskCh chan taskDetail,
respCh chan error,
limiter *rate.Limiter,
client cclient.Client,
) {
batcher := ctx.Value(batcherContextKey).(*Batcher)
for {
Expand All @@ -350,7 +304,7 @@ func startTaskProcessor(

switch batchParams.BatchType {
case BatchTypeTerminate:
err = processTerminateTask(ctx, limiter, task, batchParams)
err = processTerminateTask(ctx, limiter, task, batchParams, client)
}
if err != nil {
batcher.metricsClient.IncCounter(metrics.BatcherScope, metrics.BatcherProcessorFailures)
Expand All @@ -372,8 +326,7 @@ func startTaskProcessor(
}
}

func processTerminateTask(ctx context.Context, limiter *rate.Limiter, task taskDetail, batchParams BatchParams) error {
batcher := ctx.Value(batcherContextKey).(*Batcher)
func processTerminateTask(ctx context.Context, limiter *rate.Limiter, task taskDetail, batchParams BatchParams, client cclient.Client) error {

wfs := []shared.WorkflowExecution{task.execution}
for len(wfs) > 0 {
Expand All @@ -384,14 +337,7 @@ func processTerminateTask(ctx context.Context, limiter *rate.Limiter, task taskD
return err
}

newCtx, cancel := context.WithTimeout(ctx, rpcTimeout)
err = batcher.svcClient.TerminateWorkflowExecution(newCtx, &shared.TerminateWorkflowExecutionRequest{
Domain: common.StringPtr(batchParams.DomainName),
WorkflowExecution: &wf,
Reason: common.StringPtr(batchParams.Reason),
Identity: common.StringPtr(batchWFTypeName),
})
cancel()
err = client.TerminateWorkflow(ctx, wf.GetWorkflowId(), wf.GetRunId(), batchParams.Reason, []byte{})
if err != nil {
// EntityNotExistsError means wf is not running or deleted
_, ok := err.(*shared.EntityNotExistsError)
Expand All @@ -400,12 +346,7 @@ func processTerminateTask(ctx context.Context, limiter *rate.Limiter, task taskD
}
}
wfs = wfs[1:]
newCtx, cancel = context.WithTimeout(ctx, rpcTimeout)
resp, err := batcher.svcClient.DescribeWorkflowExecution(newCtx, &shared.DescribeWorkflowExecutionRequest{
Domain: common.StringPtr(batchParams.DomainName),
Execution: &wf,
})
cancel()
resp, err := client.DescribeWorkflowExecution(ctx, wf.GetWorkflowId(), wf.GetRunId())
if err != nil {
// EntityNotExistsError means wf is deleted
_, ok := err.(*shared.EntityNotExistsError)
Expand Down

0 comments on commit 68b4c7c

Please sign in to comment.