From c19d06e616aee65041a17c5368150b474f057032 Mon Sep 17 00:00:00 2001 From: Chad Retz Date: Tue, 26 Mar 2024 07:22:53 -0500 Subject: [PATCH] Implement workflow count, temporarily remove workflow trace --- temporalcli/commands.gen.go | 30 ++------ temporalcli/commands.workflow.go | 4 - temporalcli/commands.workflow_view.go | 48 +++++++++++- temporalcli/commands.workflow_view_test.go | 89 ++++++++++++++++++++++ temporalcli/commandsmd/commands.md | 12 +-- 5 files changed, 149 insertions(+), 34 deletions(-) diff --git a/temporalcli/commands.gen.go b/temporalcli/commands.gen.go index 0546d37e2..ee0370bc4 100644 --- a/temporalcli/commands.gen.go +++ b/temporalcli/commands.gen.go @@ -1624,7 +1624,6 @@ func NewTemporalWorkflowCommand(cctx *CommandContext, parent *TemporalCommand) * s.Command.AddCommand(&NewTemporalWorkflowStackCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowStartCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowTerminateCommand(cctx, &s).Command) - s.Command.AddCommand(&NewTemporalWorkflowTraceCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowUpdateCommand(cctx, &s).Command) s.ClientOptions.buildFlags(cctx, s.Command.PersistentFlags()) return &s @@ -1660,6 +1659,7 @@ func NewTemporalWorkflowCancelCommand(cctx *CommandContext, parent *TemporalWork type TemporalWorkflowCountCommand struct { Parent *TemporalWorkflowCommand Command cobra.Command + Query string } func NewTemporalWorkflowCountCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowCountCommand { @@ -1668,8 +1668,13 @@ func NewTemporalWorkflowCountCommand(cctx *CommandContext, parent *TemporalWorkf s.Command.DisableFlagsInUseLine = true s.Command.Use = "count [flags]" s.Command.Short = "Count Workflow Executions." - s.Command.Long = "TODO" + if hasHighlighting { + s.Command.Long = "The \x1b[1mtemporal workflow count\x1b[0m command returns a count of Workflow Executions.\n\nUse the options listed below to change the command's behavior." + } else { + s.Command.Long = "The `temporal workflow count` command returns a count of Workflow Executions.\n\nUse the options listed below to change the command's behavior." + } s.Command.Args = cobra.NoArgs + s.Command.Flags().StringVarP(&s.Query, "query", "q", "", "Filter results using a SQL-like query.") s.Command.Run = func(c *cobra.Command, args []string) { if err := s.run(cctx, args); err != nil { cctx.Options.Fail(err) @@ -2147,27 +2152,6 @@ func NewTemporalWorkflowTerminateCommand(cctx *CommandContext, parent *TemporalW return &s } -type TemporalWorkflowTraceCommand struct { - Parent *TemporalWorkflowCommand - Command cobra.Command -} - -func NewTemporalWorkflowTraceCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowTraceCommand { - var s TemporalWorkflowTraceCommand - s.Parent = parent - s.Command.DisableFlagsInUseLine = true - s.Command.Use = "trace [flags]" - s.Command.Short = "Trace progress of a Workflow Execution and its children." - s.Command.Long = "TODO" - s.Command.Args = cobra.NoArgs - s.Command.Run = func(c *cobra.Command, args []string) { - if err := s.run(cctx, args); err != nil { - cctx.Options.Fail(err) - } - } - return &s -} - type TemporalWorkflowUpdateCommand struct { Parent *TemporalWorkflowCommand Command cobra.Command diff --git a/temporalcli/commands.workflow.go b/temporalcli/commands.workflow.go index 4bd5f7b47..c4b6922dc 100644 --- a/temporalcli/commands.workflow.go +++ b/temporalcli/commands.workflow.go @@ -187,10 +187,6 @@ func (c *TemporalWorkflowTerminateCommand) run(cctx *CommandContext, _ []string) return nil } -func (*TemporalWorkflowTraceCommand) run(*CommandContext, []string) error { - return fmt.Errorf("TODO") -} - func (c *TemporalWorkflowUpdateCommand) run(cctx *CommandContext, args []string) error { cl, err := c.Parent.ClientOptions.dialClient(cctx) if err != nil { diff --git a/temporalcli/commands.workflow_view.go b/temporalcli/commands.workflow_view.go index 145fa1313..8498bc4c6 100644 --- a/temporalcli/commands.workflow_view.go +++ b/temporalcli/commands.workflow_view.go @@ -14,6 +14,7 @@ import ( "go.temporal.io/api/workflow/v1" "go.temporal.io/api/workflowservice/v1" "go.temporal.io/sdk/client" + "go.temporal.io/sdk/converter" ) func (c *TemporalWorkflowDescribeCommand) run(cctx *CommandContext, args []string) error { @@ -265,8 +266,51 @@ func (c *TemporalWorkflowListCommand) pageFetcher( } } -func (*TemporalWorkflowCountCommand) run(*CommandContext, []string) error { - return fmt.Errorf("TODO") +func (c *TemporalWorkflowCountCommand) run(cctx *CommandContext, _ []string) error { + cl, err := c.Parent.ClientOptions.dialClient(cctx) + if err != nil { + return err + } + defer cl.Close() + + resp, err := cl.WorkflowService().CountWorkflowExecutions(cctx, &workflowservice.CountWorkflowExecutionsRequest{ + Namespace: c.Parent.Namespace, + Query: c.Query, + }) + if err != nil { + return err + } + + // Just dump response on JSON, otherwise print total and groups + if cctx.JSONOutput { + // Shorthand does not apply to search attributes currently, so we're going + // to remove the "type" from the metadata encoding on group values to make + // it apply + for _, group := range resp.Groups { + for _, payload := range group.GroupValues { + delete(payload.GetMetadata(), "type") + } + } + return cctx.Printer.PrintStructured(resp, printer.StructuredOptions{}) + } + + cctx.Printer.Printlnf("Total: %v", resp.Count) + for _, group := range resp.Groups { + // Payload values are search attributes, so we can use the default converter + var valueStr string + for _, payload := range group.GroupValues { + var value any + if err := converter.GetDefaultDataConverter().FromPayload(payload, &value); err != nil { + value = fmt.Sprintf("", err) + } + if valueStr != "" { + valueStr += ", " + } + valueStr += fmt.Sprintf("%v", value) + } + cctx.Printer.Printlnf("Group total: %v, values: %v", group.Count, valueStr) + } + return nil } func (c *TemporalWorkflowShowCommand) run(cctx *CommandContext, args []string) error { diff --git a/temporalcli/commands.workflow_view_test.go b/temporalcli/commands.workflow_view_test.go index 3bc6a5bf7..088924794 100644 --- a/temporalcli/commands.workflow_view_test.go +++ b/temporalcli/commands.workflow_view_test.go @@ -5,9 +5,11 @@ import ( "encoding/json" "fmt" "strconv" + "strings" "time" "github.com/temporalio/cli/temporalcli" + "go.temporal.io/api/enums/v1" "go.temporal.io/api/workflowservice/v1" "go.temporal.io/sdk/client" "go.temporal.io/sdk/workflow" @@ -328,3 +330,90 @@ func (s *SharedServerSuite) TestWorkflow_List() { s.ContainsOnSameLine(out, "name", "DevWorkflow") s.ContainsOnSameLine(out, "status", "WORKFLOW_EXECUTION_STATUS_COMPLETED") } + +func (s *SharedServerSuite) TestWorkflow_Count() { + s.Worker.OnDevWorkflow(func(ctx workflow.Context, shouldComplete any) (any, error) { + // Only complete if shouldComplete is a true bool + shouldCompleteBool, _ := shouldComplete.(bool) + return nil, workflow.Await(ctx, func() bool { return shouldCompleteBool }) + }) + + // Create 3 that complete and 2 that don't + for i := 0; i < 5; i++ { + _, err := s.Client.ExecuteWorkflow( + s.Context, + client.StartWorkflowOptions{TaskQueue: s.Worker.Options.TaskQueue}, + DevWorkflow, + i < 3, + ) + s.NoError(err) + } + + // List and confirm they are all there in expected statuses + s.Eventually( + func() bool { + resp, err := s.Client.ListWorkflow(s.Context, &workflowservice.ListWorkflowExecutionsRequest{ + Query: "TaskQueue = '" + s.Worker.Options.TaskQueue + "'", + }) + s.NoError(err) + var completed, running int + for _, exec := range resp.Executions { + if exec.Status == enums.WORKFLOW_EXECUTION_STATUS_COMPLETED { + completed++ + } else if exec.Status == enums.WORKFLOW_EXECUTION_STATUS_RUNNING { + running++ + } + } + return completed == 3 && running == 2 + }, + 5*time.Second, + 100*time.Millisecond, + ) + + // Simple count w/out grouping + res := s.Execute( + "workflow", "count", + "--address", s.Address(), + "--query", "TaskQueue = '"+s.Worker.Options.TaskQueue+"'", + ) + s.NoError(res.Err) + out := res.Stdout.String() + s.Equal("Total: 5", strings.TrimSpace(out)) + + // Grouped + res = s.Execute( + "workflow", "count", + "--address", s.Address(), + "--query", "TaskQueue = '"+s.Worker.Options.TaskQueue+"' GROUP BY ExecutionStatus", + ) + s.NoError(res.Err) + out = res.Stdout.String() + s.Contains(out, "Total: 5") + s.Contains(out, "Group total: 2, values: Running") + s.Contains(out, "Group total: 3, values: Completed") + + // Simple count w/out grouping JSON + res = s.Execute( + "workflow", "count", + "--address", s.Address(), + "--query", "TaskQueue = '"+s.Worker.Options.TaskQueue+"'", + "-o", "json", + ) + s.NoError(res.Err) + out = res.Stdout.String() + // Proto JSON makes this count a string + s.Contains(out, `"count": "5"`) + + // Grouped JSON + res = s.Execute( + "workflow", "count", + "--address", s.Address(), + "--query", "TaskQueue = '"+s.Worker.Options.TaskQueue+"' GROUP BY ExecutionStatus", + "-o", "jsonl", + ) + s.NoError(res.Err) + out = res.Stdout.String() + s.Contains(out, `"count":"5"`) + s.Contains(out, `{"groupValues":["Running"],"count":"2"}`) + s.Contains(out, `{"groupValues":["Completed"],"count":"3"}`) +} diff --git a/temporalcli/commandsmd/commands.md b/temporalcli/commandsmd/commands.md index 89b3178a0..5df328753 100644 --- a/temporalcli/commandsmd/commands.md +++ b/temporalcli/commandsmd/commands.md @@ -723,7 +723,13 @@ Includes options set for [single workflow or batch](#options-set-single-workflow ### temporal workflow count: Count Workflow Executions. -TODO +The `temporal workflow count` command returns a count of [Workflow Executions](/concepts/what-is-a-workflow-execution). + +Use the options listed below to change the command's behavior. + +#### Options + +* `--query`, `-q` (string) - Filter results using a SQL-like query. ### temporal workflow delete: Deletes a Workflow Execution. @@ -1018,10 +1024,6 @@ Use the options listed below to change the behavior of this command. * `--reason` (string) - Reason for termination. Defaults to message with the current user's name. * `--yes`, `-y` (bool) - Confirm prompt to perform batch. Only allowed if query is present. -### temporal workflow trace: Trace progress of a Workflow Execution and its children. - -TODO - ### temporal workflow update: Updates a running workflow synchronously. The `temporal workflow update` command is used to synchronously [Update](/concepts/what-is-an-update) a