Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement workflow terminate command #436

Merged
merged 8 commits into from
Feb 7, 2024
22 changes: 18 additions & 4 deletions temporalcli/commands.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@ func (v *SingleWorkflowOrBatchOptions) buildFlags(cctx *CommandContext, f *pflag
f.StringVarP(&v.WorkflowId, "workflow-id", "w", "", "Workflow Id. Either this or query must be set.")
f.StringVarP(&v.RunId, "run-id", "r", "", "Run Id. Cannot be set when query is set.")
f.StringVarP(&v.Query, "query", "q", "", "Start a batch to Signal Workflow Executions with given List Filter. Either this or Workflow Id must be set.")
f.StringVar(&v.Reason, "reason", "", "Reason to perform batch. Only allowed if query is present. Defaults to message with user name and time.")
f.StringVar(&v.Reason, "reason", "", "Reason to perform batch. Only allowed if query is present unless the command specifies otherwise. Defaults to message with the current user's name.")
f.BoolVarP(&v.Yes, "yes", "y", false, "Confirm prompt to perform batch. Only allowed if query is present.")
}

Expand Down Expand Up @@ -776,8 +776,13 @@ func NewTemporalWorkflowStartCommand(cctx *CommandContext, parent *TemporalWorkf
}

type TemporalWorkflowTerminateCommand struct {
Parent *TemporalWorkflowCommand
Command cobra.Command
Parent *TemporalWorkflowCommand
Command cobra.Command
WorkflowId string
RunId string
Query string
Reason string
Yes bool
}

func NewTemporalWorkflowTerminateCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowTerminateCommand {
Expand All @@ -786,8 +791,17 @@ func NewTemporalWorkflowTerminateCommand(cctx *CommandContext, parent *TemporalW
s.Command.DisableFlagsInUseLine = true
s.Command.Use = "terminate [flags]"
s.Command.Short = "Terminate Workflow Execution by ID or List Filter."
s.Command.Long = "TODO"
if hasHighlighting {
s.Command.Long = "The \x1b[1mtemporal workflow terminate\x1b[0m command is used to terminate a Workflow Execution. \nCanceling a running Workflow Execution records a \x1b[1mWorkflowExecutionTerminated\x1b[0m event as the closing Event in the workflow's Event History. \nWorkflow code is oblivious to termination. Use \x1b[1mtemporal workflow cancel\x1b[0m if you need to perform cleanup in your workflow.\n\nExecutions may be terminated by ID with an optional reason:\n\x1b[1mtemporal workflow terminate [--reason my-reason] --workflow-id MyWorkflowId\x1b[0m\n\n...or in bulk via a visibility query list filter:\n\x1b[1mtemporal workflow terminate --query=MyQuery\x1b[0m\n\nUse the options listed below to change the behavior of this command."
} else {
s.Command.Long = "The `temporal workflow terminate` command is used to terminate a Workflow Execution. \nCanceling a running Workflow Execution records a `WorkflowExecutionTerminated` event as the closing Event in the workflow's Event History. \nWorkflow code is oblivious to termination. Use `temporal workflow cancel` if you need to perform cleanup in your workflow.\n\nExecutions may be terminated by ID with an optional reason:\n```\ntemporal workflow terminate [--reason my-reason] --workflow-id MyWorkflowId\n```\n\n...or in bulk via a visibility query list filter:\n```\ntemporal workflow terminate --query=MyQuery\n```\n\nUse the options listed below to change the behavior of this command."
}
s.Command.Args = cobra.NoArgs
s.Command.Flags().StringVarP(&s.WorkflowId, "workflow-id", "w", "", "Workflow Id. Either this or query must be set.")
s.Command.Flags().StringVarP(&s.RunId, "run-id", "r", "", "Run Id. Cannot be set when query is set.")
s.Command.Flags().StringVarP(&s.Query, "query", "q", "", "Start a batch to terminate Workflow Executions with given List Filter. Either this or Workflow Id must be set.")
s.Command.Flags().StringVar(&s.Reason, "reason", "", "Reason for termination. Defaults to message with the current user's name.")
s.Command.Flags().BoolVarP(&s.Yes, "yes", "y", false, "Confirm prompt to perform batch. Only allowed if query is present.")
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
cctx.Options.Fail(err)
Expand Down
72 changes: 63 additions & 9 deletions temporalcli/commands.workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (c *TemporalWorkflowSignalCommand) run(cctx *CommandContext, args []string)
return err
}

exec, batchReq, err := c.workflowExecOrBatch(cctx, c.Parent.Namespace, cl)
exec, batchReq, err := c.workflowExecOrBatch(cctx, c.Parent.Namespace, cl, singleOrBatchOverrides{})
if err != nil {
return err
}
Expand Down Expand Up @@ -84,8 +84,53 @@ func (*TemporalWorkflowStackCommand) run(*CommandContext, []string) error {
return fmt.Errorf("TODO")
}

func (*TemporalWorkflowTerminateCommand) run(*CommandContext, []string) error {
return fmt.Errorf("TODO")
func (c *TemporalWorkflowTerminateCommand) run(cctx *CommandContext, _ []string) error {
cl, err := c.Parent.ClientOptions.dialClient(cctx)
if err != nil {
return err
}
defer cl.Close()

// We create a faux SingleWorkflowOrBatchOptions to use the shared logic
opts := SingleWorkflowOrBatchOptions{
WorkflowId: c.WorkflowId,
RunId: c.RunId,
Query: c.Query,
Reason: c.Reason,
Yes: c.Yes,
}

exec, batchReq, err := opts.workflowExecOrBatch(cctx, c.Parent.Namespace, cl, singleOrBatchOverrides{
// You're allowed to specify a reason when terminating a workflow
AllowReasonWithWorkflowID: true,
})
if err != nil {
return err
}

// Run single or batch
if exec != nil {
reason := c.Reason
if reason == "" {
reason = defaultReason()
}
err = cl.TerminateWorkflow(cctx, exec.WorkflowId, exec.RunId, reason)
if err != nil {
return fmt.Errorf("failed to terminate workflow: %w", err)
}
cctx.Printer.Println("Workflow terminated")
} else if batchReq != nil {
batchReq.Operation = &workflowservice.StartBatchOperationRequest_TerminationOperation{
TerminationOperation: &batch.BatchOperationTermination{
Identity: clientIdentity(),
},
}
if err := startBatchJob(cctx, cl, batchReq); err != nil {
return err
}
}

return nil
}

func (*TemporalWorkflowTraceCommand) run(*CommandContext, []string) error {
Expand All @@ -96,16 +141,29 @@ func (*TemporalWorkflowUpdateCommand) run(*CommandContext, []string) error {
return fmt.Errorf("TODO")
}

func defaultReason() string {
username := "<unknown-user>"
if u, err := user.Current(); err != nil && u.Username != "" {
username = u.Username
}
return "Requested from CLI by " + username
}

type singleOrBatchOverrides struct {
AllowReasonWithWorkflowID bool
}

func (s *SingleWorkflowOrBatchOptions) workflowExecOrBatch(
cctx *CommandContext,
namespace string,
cl client.Client,
overrides singleOrBatchOverrides,
Comment on lines 158 to +160
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably can toss all of this in a single struct instead of keeping overrides separate, but no biggie

) (*common.WorkflowExecution, *workflowservice.StartBatchOperationRequest, error) {
// If workflow is set, we return single execution
if s.WorkflowId != "" {
if s.Query != "" {
return nil, nil, fmt.Errorf("cannot set query when workflow ID is set")
} else if s.Reason != "" {
} else if s.Reason != "" && !overrides.AllowReasonWithWorkflowID {
return nil, nil, fmt.Errorf("cannot set reason when workflow ID is set")
} else if s.Yes {
return nil, nil, fmt.Errorf("cannot set 'yes' when workflow ID is set")
Expand Down Expand Up @@ -139,11 +197,7 @@ func (s *SingleWorkflowOrBatchOptions) workflowExecOrBatch(
// Default the reason if not set
reason := s.Reason
if reason == "" {
username := "<unknown-user>"
if u, err := user.Current(); err != nil && u.Username != "" {
username = u.Username
}
reason = "Requested from CLI by " + username
reason = defaultReason()
}

return nil, &workflowservice.StartBatchOperationRequest{
Expand Down
161 changes: 161 additions & 0 deletions temporalcli/commands.workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/google/uuid"
"go.temporal.io/api/enums/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/workflow"
Expand Down Expand Up @@ -114,3 +115,163 @@ func (s *SharedServerSuite) testSignalBatchWorkflow(json bool) *CommandResult {
}
return res
}

func (s *SharedServerSuite) TestWorkflow_Terminate_SingleWorkflowSuccess_WithoutReason() {
s.Worker.OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) {
ctx.Done().Receive(ctx, nil)
return nil, ctx.Err()
})

// Start the workflow
run, err := s.Client.ExecuteWorkflow(
s.Context,
client.StartWorkflowOptions{TaskQueue: s.Worker.Options.TaskQueue},
DevWorkflow,
"ignored",
)
s.NoError(err)

// Send terminate
res := s.Execute(
"workflow", "terminate",
"--address", s.Address(),
"-w", run.GetID(),
)
s.NoError(res.Err)

// Confirm workflow was terminated
s.Contains(run.Get(s.Context, nil).Error(), "terminated")
// Ensure the termination reason was recorded
iter := s.Client.GetWorkflowHistory(s.Context, run.GetID(), run.GetRunID(), false, enums.HISTORY_EVENT_FILTER_TYPE_CLOSE_EVENT)
var foundReason bool
for iter.HasNext() {
event, err := iter.Next()
s.NoError(err)
if term := event.GetWorkflowExecutionTerminatedEventAttributes(); term != nil {
foundReason = true
// We're not going to check the value here so we don't pin ourselves to our particular default, but there _should_ be a default reason
s.NotEmpty(term.Reason)
}
}
s.True(foundReason)
}

func (s *SharedServerSuite) TestWorkflow_Terminate_SingleWorkflowSuccess_WithReason() {
s.Worker.OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) {
ctx.Done().Receive(ctx, nil)
return nil, ctx.Err()
})

// Start the workflow
run, err := s.Client.ExecuteWorkflow(
s.Context,
client.StartWorkflowOptions{TaskQueue: s.Worker.Options.TaskQueue},
DevWorkflow,
"ignored",
)
s.NoError(err)

// Send terminate
res := s.Execute(
"workflow", "terminate",
"--address", s.Address(),
"-w", run.GetID(),
"--reason", "terminate-test",
)
s.NoError(res.Err)

// Confirm workflow was terminated
s.Contains(run.Get(s.Context, nil).Error(), "terminated")
tdeebswihart marked this conversation as resolved.
Show resolved Hide resolved

// Ensure the termination reason was recorded
iter := s.Client.GetWorkflowHistory(s.Context, run.GetID(), run.GetRunID(), false, enums.HISTORY_EVENT_FILTER_TYPE_CLOSE_EVENT)
var foundReason bool
for iter.HasNext() {
event, err := iter.Next()
s.NoError(err)
if term := event.GetWorkflowExecutionTerminatedEventAttributes(); term != nil {
foundReason = true
s.Equal("terminate-test", term.Reason)
}
}
s.True(foundReason)
}

func (s *SharedServerSuite) TestWorkflow_Terminate_BatchWorkflowSuccess() {
res := s.testTerminateBatchWorkflow(false)
s.Contains(res.Stdout.String(), "approximately 5 workflow(s)")
s.Contains(res.Stdout.String(), "Started batch")
}

func (s *SharedServerSuite) TestWorkflow_Terminate_BatchWorkflowSuccessJSON() {
res := s.testTerminateBatchWorkflow(true)
var jsonRes map[string]any
s.NoError(json.Unmarshal(res.Stdout.Bytes(), &jsonRes))
s.NotEmpty(jsonRes["batchJobId"])
}

func (s *SharedServerSuite) testTerminateBatchWorkflow(json bool) *CommandResult {
s.Worker.OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) {
ctx.Done().Receive(ctx, nil)
return nil, ctx.Err()
})

// Start 5 workflows
runs := make([]client.WorkflowRun, 5)
searchAttr := "keyword-" + uuid.NewString()
for i := range runs {
run, err := s.Client.ExecuteWorkflow(
s.Context,
client.StartWorkflowOptions{
TaskQueue: s.Worker.Options.TaskQueue,
SearchAttributes: map[string]any{"CustomKeywordField": searchAttr},
},
DevWorkflow,
"ignored",
)
s.NoError(err)
runs[i] = run
}

// Wait for all to appear in list
s.Eventually(func() bool {
resp, err := s.Client.ListWorkflow(s.Context, &workflowservice.ListWorkflowExecutionsRequest{
Query: "CustomKeywordField = '" + searchAttr + "'",
})
s.NoError(err)
return len(resp.Executions) == len(runs)
}, 3*time.Second, 100*time.Millisecond)

// Send batch terminate with a "y" for non-json or "--yes" for json
args := []string{
"workflow", "terminate",
"--address", s.Address(),
"--query", "CustomKeywordField = '" + searchAttr + "'",
"--reason", "terminate-test",
}
if json {
args = append(args, "--yes", "-o", "json")
} else {
s.CommandHarness.Stdin.WriteString("y\n")
}
res := s.Execute(args...)
s.NoError(res.Err)

// Confirm that all workflows are terminated
for _, run := range runs {
s.Contains(run.Get(s.Context, nil).Error(), "terminated")
tdeebswihart marked this conversation as resolved.
Show resolved Hide resolved
// Ensure the termination reason was recorded
iter := s.Client.GetWorkflowHistory(s.Context, run.GetID(), run.GetRunID(), false, enums.HISTORY_EVENT_FILTER_TYPE_CLOSE_EVENT)
var foundReason bool
for iter.HasNext() {
event, err := iter.Next()
s.NoError(err)
if term := event.GetWorkflowExecutionTerminatedEventAttributes(); term != nil {
foundReason = true
s.Equal("terminate-test", term.Reason)
}
}
s.True(foundReason)
}
return res
}
29 changes: 25 additions & 4 deletions temporalcli/commandsmd/commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -338,8 +338,7 @@ Includes options set for [payload input](#options-set-for-payload-input).
* `--run-id`, `-r` (string) - Run Id. Cannot be set when query is set.
* `--query`, `-q` (string) - Start a batch to Signal Workflow Executions with given List Filter. Either this or
Workflow Id must be set.
* `--reason` (string) - Reason to perform batch. Only allowed if query is present. Defaults to message with user name
and time.
* `--reason` (string) - Reason to perform batch. Only allowed if query is present unless the command specifies otherwise. Defaults to message with the current user's name.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still mentions batch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're looking at the help for workflow signal rather than workflow terminate. That command only allows a reason to be specified when invoked in batch mode. If you check the options for workflow terminate you'll see I updated it there already:

* `--reason` (string) - Reason to terminate this workflow. Defaults to message with the current user's name.

* `--yes`, `-y` (bool) - Confirm prompt to perform batch. Only allowed if query is present.

### temporal workflow stack: Query a Workflow Execution with __stack_trace as the query type.
Expand Down Expand Up @@ -386,12 +385,34 @@ temporal workflow start \

### temporal workflow terminate: Terminate Workflow Execution by ID or List Filter.

TODO
The `temporal workflow terminate` command is used to terminate a [Workflow Execution](/concepts/what-is-a-workflow-execution).
Canceling a running Workflow Execution records a `WorkflowExecutionTerminated` event as the closing Event in the workflow's Event History.
Workflow code is oblivious to termination. Use `temporal workflow cancel` if you need to perform cleanup in your workflow.

Executions may be terminated by [ID](/concepts/what-is-a-workflow-id) with an optional reason:
```
temporal workflow terminate [--reason my-reason] --workflow-id MyWorkflowId
```

...or in bulk via a visibility query [list filter](/concepts/what-is-a-list-filter):
```
temporal workflow terminate --query=MyQuery
```

Use the options listed below to change the behavior of this command.

#### Options

* `--workflow-id`, `-w` (string) - Workflow Id. Either this or query must be set.
* `--run-id`, `-r` (string) - Run Id. Cannot be set when query is set.
* `--query`, `-q` (string) - Start a batch to terminate Workflow Executions with given List Filter. Either this or Workflow Id must be set.
* `--reason` (string) - Reason for termination. Defaults to message with the current user's name.
* `--yes`, `-y` (bool) - Confirm prompt to perform batch. Only allowed if query is present.
Comment on lines +406 to +410
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* `--workflow-id`, `-w` (string) - Workflow Id. Either this or query must be set.
* `--run-id`, `-r` (string) - Run Id. Cannot be set when query is set.
* `--query`, `-q` (string) - Start a batch to terminate Workflow Executions with given List Filter. Either this or Workflow Id must be set.
* `--reason` (string) - Reason for termination. Defaults to message with the current user's name.
* `--yes`, `-y` (bool) - Confirm prompt to perform batch. Only allowed if query is present.
Includes options set for [single workflow or batch](#options-set-for-single-workflow-or-batch).

Or are you making this a completely separate options set just because of the English description of reason?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that #options-set-for-single-workflow-or-batch mentions signal and reason is applied only to batch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's the discussion Roey and I had about this @cretz: #436 (comment)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can make docs more generic if we need to, but I have no problem copy/pasting just for docs-only reasons if we have to like here.


### temporal workflow trace: Trace progress of a Workflow Execution and its children.

TODO

### temporal workflow update: Updates a running workflow synchronously.

TODO
TODO
Loading