Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement workflow terminate command #436

Merged
merged 8 commits into from
Feb 7, 2024
8 changes: 7 additions & 1 deletion temporalcli/commands.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,7 @@ func NewTemporalWorkflowStartCommand(cctx *CommandContext, parent *TemporalWorkf
type TemporalWorkflowTerminateCommand struct {
Parent *TemporalWorkflowCommand
Command cobra.Command
SingleWorkflowOrBatchOptions
}

func NewTemporalWorkflowTerminateCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowTerminateCommand {
Expand All @@ -786,8 +787,13 @@ func NewTemporalWorkflowTerminateCommand(cctx *CommandContext, parent *TemporalW
s.Command.DisableFlagsInUseLine = true
s.Command.Use = "terminate [flags]"
s.Command.Short = "Terminate Workflow Execution by ID or List Filter."
s.Command.Long = "TODO"
if hasHighlighting {
s.Command.Long = "The \x1b[1mtemporal workflow terminate\x1b[0m command is used to terminate a Workflow Execution. Canceling a running Workflow Execution records a \x1b[1mWorkflowExecutionTerminated\x1b[0m event as the closing Event in the workflow's Event History. No further command tasks may be scheduled after running this command.\n\nExecutions may be terminated by ID:\n\x1b[1mtemporal workflow terminate --workflow-id MyWorkflowId\x1b[0m\n\n...or in bulk via a visibility query list filter:\n\n\x1b[1mtemporal workflow terminate --query=MyQuery\x1b[0m\n\nUse the options listed below to change the behavior of this command."
} else {
s.Command.Long = "The `temporal workflow terminate` command is used to terminate a Workflow Execution. Canceling a running Workflow Execution records a `WorkflowExecutionTerminated` event as the closing Event in the workflow's Event History. No further command tasks may be scheduled after running this command.\n\nExecutions may be terminated by ID:\n```\ntemporal workflow terminate --workflow-id MyWorkflowId\n```\n\n...or in bulk via a visibility query list filter:\n\n```\ntemporal workflow terminate --query=MyQuery\n```\n\nUse the options listed below to change the behavior of this command."
}
s.Command.Args = cobra.NoArgs
s.SingleWorkflowOrBatchOptions.buildFlags(cctx, s.Command.Flags())
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
cctx.Options.Fail(err)
Expand Down
48 changes: 45 additions & 3 deletions temporalcli/commands.workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,40 @@ func (*TemporalWorkflowStackCommand) run(*CommandContext, []string) error {
return fmt.Errorf("TODO")
}

func (*TemporalWorkflowTerminateCommand) run(*CommandContext, []string) error {
return fmt.Errorf("TODO")
func (c *TemporalWorkflowTerminateCommand) run(cctx *CommandContext, _ []string) error {
cl, err := c.Parent.ClientOptions.dialClient(cctx)
if err != nil {
return err
}
defer cl.Close()

exec, batchReq, err := c.workflowExecOrBatch(cctx, c.Parent.Namespace, cl, func(p *singleOrBatchParams) {
// You're allowed to specify a reason when terminating a workflow
p.AllowReasonWithWorkflowID = true
})
if err != nil {
return err
}

// Run single or batch
if exec != nil {
err = cl.TerminateWorkflow(cctx, exec.WorkflowId, exec.RunId, c.Reason)
if err != nil {
return fmt.Errorf("failed to terminate workflow: %w", err)
}
cctx.Printer.Println("Workflow terminated")
} else if batchReq != nil {
batchReq.Operation = &workflowservice.StartBatchOperationRequest_TerminationOperation{
TerminationOperation: &batch.BatchOperationTermination{
Identity: clientIdentity(),
},
}
if err := startBatchJob(cctx, cl, batchReq); err != nil {
return err
}
}

return nil
}

func (*TemporalWorkflowTraceCommand) run(*CommandContext, []string) error {
Expand All @@ -96,16 +128,26 @@ func (*TemporalWorkflowUpdateCommand) run(*CommandContext, []string) error {
return fmt.Errorf("TODO")
}

type singleOrBatchParams struct {
AllowReasonWithWorkflowID bool
}

func (s *SingleWorkflowOrBatchOptions) workflowExecOrBatch(
cctx *CommandContext,
namespace string,
cl client.Client,
options ...func(*singleOrBatchParams),
tdeebswihart marked this conversation as resolved.
Show resolved Hide resolved
) (*common.WorkflowExecution, *workflowservice.StartBatchOperationRequest, error) {
var p singleOrBatchParams
for _, opt := range options {
opt(&p)
}

// If workflow is set, we return single execution
if s.WorkflowId != "" {
if s.Query != "" {
return nil, nil, fmt.Errorf("cannot set query when workflow ID is set")
} else if s.Reason != "" {
} else if !p.AllowReasonWithWorkflowID && s.Reason != "" {
return nil, nil, fmt.Errorf("cannot set reason when workflow ID is set")
} else if s.Yes {
return nil, nil, fmt.Errorf("cannot set 'yes' when workflow ID is set")
Expand Down
97 changes: 97 additions & 0 deletions temporalcli/commands.workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,100 @@ func (s *SharedServerSuite) testSignalBatchWorkflow(json bool) *CommandResult {
}
return res
}
func (s *SharedServerSuite) TestWorkflow_Terminate_SingleWorkflowSuccess() {
tdeebswihart marked this conversation as resolved.
Show resolved Hide resolved
// Make workflow wait for termination and then return the context's error
tdeebswihart marked this conversation as resolved.
Show resolved Hide resolved
s.Worker.OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) {
ctx.Done().Receive(ctx, nil)
return nil, ctx.Err()
})

// Start the workflow
run, err := s.Client.ExecuteWorkflow(
s.Context,
client.StartWorkflowOptions{TaskQueue: s.Worker.Options.TaskQueue},
DevWorkflow,
"ignored",
)
s.NoError(err)

// Send terminate
res := s.Execute(
"workflow", "terminate",
"--address", s.Address(),
"-w", run.GetID(),
// Ensure that we can provide a reason
"--reason", "terminate-test",
)
s.NoError(res.Err)

// Confirm workflow was terminated
s.Contains(run.Get(s.Context, nil).Error(), "terminated")
tdeebswihart marked this conversation as resolved.
Show resolved Hide resolved
}

func (s *SharedServerSuite) TestWorkflow_Terminate_BatchWorkflowSuccess() {
res := s.testTerminateBatchWorkflow(false)
s.Contains(res.Stdout.String(), "approximately 5 workflow(s)")
s.Contains(res.Stdout.String(), "Started batch")
}

func (s *SharedServerSuite) TestWorkflow_Terminate_BatchWorkflowSuccessJSON() {
res := s.testTerminateBatchWorkflow(true)
var jsonRes map[string]any
s.NoError(json.Unmarshal(res.Stdout.Bytes(), &jsonRes))
s.NotEmpty(jsonRes["batchJobId"])
}

func (s *SharedServerSuite) testTerminateBatchWorkflow(json bool) *CommandResult {
// Make workflow wait for terminate and then return the context's error
s.Worker.OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) {
ctx.Done().Receive(ctx, nil)
return nil, ctx.Err()
})

// Start 5 workflows
runs := make([]client.WorkflowRun, 5)
searchAttr := "keyword-" + uuid.NewString()
for i := range runs {
run, err := s.Client.ExecuteWorkflow(
s.Context,
client.StartWorkflowOptions{
TaskQueue: s.Worker.Options.TaskQueue,
SearchAttributes: map[string]any{"CustomKeywordField": searchAttr},
},
DevWorkflow,
"ignored",
)
s.NoError(err)
runs[i] = run
}

// Wait for all to appear in list
s.Eventually(func() bool {
resp, err := s.Client.ListWorkflow(s.Context, &workflowservice.ListWorkflowExecutionsRequest{
Query: "CustomKeywordField = '" + searchAttr + "'",
})
s.NoError(err)
return len(resp.Executions) == len(runs)
}, 3*time.Second, 100*time.Millisecond)

// Send batch terminate with a "y" for non-json or "--yes" for json
args := []string{
"workflow", "terminate",
"--address", s.Address(),
"--query", "CustomKeywordField = '" + searchAttr + "'",
"--reason", "terminate-test",
}
if json {
args = append(args, "--yes", "-o", "json")
} else {
s.CommandHarness.Stdin.WriteString("y\n")
}
res := s.Execute(args...)
s.NoError(res.Err)

// Confirm that all workflows are terminated
for _, run := range runs {
s.Contains(run.Get(s.Context, nil).Error(), "terminated")
tdeebswihart marked this conversation as resolved.
Show resolved Hide resolved
}
return res
}
21 changes: 19 additions & 2 deletions temporalcli/commandsmd/commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -386,12 +386,29 @@ temporal workflow start \

### temporal workflow terminate: Terminate Workflow Execution by ID or List Filter.

TODO
The `temporal workflow terminate` command is used to terminate a [Workflow Execution](/concepts/what-is-a-workflow-execution). Canceling a running Workflow Execution records a `WorkflowExecutionTerminated` event as the closing Event in the workflow's Event History. No further command tasks may be scheduled after running this command.
tdeebswihart marked this conversation as resolved.
Show resolved Hide resolved

Executions may be terminated by [ID](/concepts/what-is-a-workflow-id):
```
temporal workflow terminate --workflow-id MyWorkflowId
```

...or in bulk via a visibility query [list filter](/concepts/what-is-a-list-filter):

```
temporal workflow terminate --query=MyQuery
```

Use the options listed below to change the behavior of this command.

#### Options

Includes options set for [single workflow or batch](#options-set-single-workflow-or-batch)
tdeebswihart marked this conversation as resolved.
Show resolved Hide resolved

### temporal workflow trace: Trace progress of a Workflow Execution and its children.

TODO

### temporal workflow update: Updates a running workflow synchronously.

TODO
TODO
Loading