Skip to content

Commit

Permalink
Add batch signal/cancel support (#2190)
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng authored Jul 10, 2019
1 parent c54d09b commit 0fc9928
Showing 1 changed file with 61 additions and 7 deletions.
68 changes: 61 additions & 7 deletions service/worker/batcher/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ const (
const (
// BatchTypeTerminate is batch type for terminating workflows
BatchTypeTerminate = "terminate"
// BatchTypeCancel is the batch type for canceling workflows
BatchTypeCancel = "cancel"
// BatchTypeSignal is batch type for signaling workflows
BatchTypeSignal = "signal"
)

type (
Expand All @@ -67,6 +71,20 @@ type (
TerminateChildren *bool
}

// CancelParams is the parameters for canceling workflow
CancelParams struct {
// this indicates whether to cancel children workflow. Default to true.
// TODO https://github.com/uber/cadence/issues/2159
// Ideally default should be childPolicy of the workflow. But it's currently totally broken.
CancelChildren *bool
}

// SignalParams is the parameters for signaling workflow
SignalParams struct {
SignalName string
Input string
}

// BatchParams is the parameters for batch operation workflow
BatchParams struct {
// Target domain to execute batch operation
Expand All @@ -81,6 +99,10 @@ type (
// Below are all optional
// TerminateParams is params only for BatchTypeTerminate
TerminateParams TerminateParams
// CancelParams is params only for BatchTypeCancel
CancelParams CancelParams
// SignalParams is params only for BatchTypeSignal
SignalParams SignalParams
// RPS of processing. Default to defaultRPS
// TODO we will implement smarter way than this static rate limiter: https://github.com/uber/cadence/issues/2138
RPS int
Expand Down Expand Up @@ -158,6 +180,13 @@ func validateParams(params BatchParams) error {
return fmt.Errorf("must provide required parameters: BatchType/Reason/DomainName/Query")
}
switch params.BatchType {
case BatchTypeSignal:
if params.SignalParams.SignalName == "" {
return fmt.Errorf("must provide signal name")
}
return nil
case BatchTypeCancel:
fallthrough
case BatchTypeTerminate:
return nil
default:
Expand Down Expand Up @@ -304,7 +333,23 @@ func startTaskProcessor(

switch batchParams.BatchType {
case BatchTypeTerminate:
err = processTerminateTask(ctx, limiter, task, batchParams, client)
err = processTask(ctx, limiter, task, batchParams, client,
batchParams.TerminateParams.TerminateChildren,
func(workflowID, runID string) error {
return client.TerminateWorkflow(ctx, workflowID, runID, batchParams.Reason, []byte{})
})
case BatchTypeCancel:
err = processTask(ctx, limiter, task, batchParams, client,
batchParams.CancelParams.CancelChildren,
func(workflowID, runID string) error {
return client.CancelWorkflow(ctx, workflowID, runID)
})
case BatchTypeSignal:
err = processTask(ctx, limiter, task, batchParams, client, common.BoolPtr(false),
func(workflowID, runID string) error {
return client.SignalWorkflow(ctx, workflowID, runID,
batchParams.SignalParams.SignalName, []byte(batchParams.SignalParams.Input))
})
}
if err != nil {
batcher.metricsClient.IncCounter(metrics.BatcherScope, metrics.BatcherProcessorFailures)
Expand All @@ -326,8 +371,15 @@ func startTaskProcessor(
}
}

func processTerminateTask(ctx context.Context, limiter *rate.Limiter, task taskDetail, batchParams BatchParams, client cclient.Client) error {

func processTask(
ctx context.Context,
limiter *rate.Limiter,
task taskDetail,
batchParams BatchParams,
client cclient.Client,
applyOnChild *bool,
procFn func(string, string) error,
) error {
wfs := []shared.WorkflowExecution{task.execution}
for len(wfs) > 0 {
wf := wfs[0]
Expand All @@ -337,7 +389,7 @@ func processTerminateTask(ctx context.Context, limiter *rate.Limiter, task taskD
return err
}

err = client.TerminateWorkflow(ctx, wf.GetWorkflowId(), wf.GetRunId(), batchParams.Reason, []byte{})
err = procFn(wf.GetWorkflowId(), wf.GetRunId())
if err != nil {
// EntityNotExistsError means wf is not running or deleted
_, ok := err.(*shared.EntityNotExistsError)
Expand All @@ -355,17 +407,19 @@ func processTerminateTask(ctx context.Context, limiter *rate.Limiter, task taskD
}
continue
}

// TODO https://github.com/uber/cadence/issues/2159
// ChildPolicy is totally broken in Cadence, we need to fix it before using
if *batchParams.TerminateParams.TerminateChildren && len(resp.PendingChildren) > 0 {
getActivityLogger(ctx).Info("Found more child workflows to terminate", tag.Number(int64(len(resp.PendingChildren))))
// By default should use ChildPolicy, but it is totally broken in Cadence, we need to fix it before using
if applyOnChild != nil && *applyOnChild && len(resp.PendingChildren) > 0 {
getActivityLogger(ctx).Info("Found more child workflows to process", tag.Number(int64(len(resp.PendingChildren))))
for _, ch := range resp.PendingChildren {
wfs = append(wfs, shared.WorkflowExecution{
WorkflowId: ch.WorkflowID,
RunId: ch.RunID,
})
}
}

activity.RecordHeartbeat(ctx, task.hbd)
}

Expand Down

0 comments on commit 0fc9928

Please sign in to comment.