Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement workflow show #458

Merged
merged 9 commits into from
Feb 16, 2024
Merged
6 changes: 2 additions & 4 deletions temporalcli/commands.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -862,8 +862,7 @@ type TemporalWorkflowShowCommand struct {
Parent *TemporalWorkflowCommand
Command cobra.Command
WorkflowReferenceOptions
ResetPoints bool
Follow bool
Follow bool
}

func NewTemporalWorkflowShowCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowShowCommand {
Expand All @@ -879,8 +878,7 @@ func NewTemporalWorkflowShowCommand(cctx *CommandContext, parent *TemporalWorkfl
}
s.Command.Args = cobra.NoArgs
s.WorkflowReferenceOptions.buildFlags(cctx, s.Command.Flags())
s.Command.Flags().BoolVar(&s.ResetPoints, "reset-points", false, "Only show auto-reset points.")
s.Command.Flags().BoolVar(&s.Follow, "follow", false, "Follow the progress of a Workflow Execution if it goes to a new run.")
s.Command.Flags().BoolVarP(&s.Follow, "follow", "f", false, "Follow the progress of a Workflow Execution in real time (does not apply to JSON output).")
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
cctx.Options.Fail(err)
Expand Down
9 changes: 8 additions & 1 deletion temporalcli/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,14 @@ func (c *TemporalCommand) initCommand(cctx *CommandContext) {
if c.Color.Value == "never" || c.Color.Value == "always" {
color.NoColor = c.Color.Value == "never"
}
return c.preRun(cctx)

res := c.preRun(cctx)

// Always disable color if JSON output is on (must be run after preRun so JSONOutput is set)
if cctx.JSONOutput {
color.NoColor = true
}
return res
}
c.Command.PersistentPostRun = func(*cobra.Command, []string) {
color.NoColor = origNoColor
Expand Down
51 changes: 42 additions & 9 deletions temporalcli/commands.workflow_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func (c *TemporalWorkflowExecuteCommand) run(cctx *CommandContext, args []string
workflowID: run.GetID(),
runID: run.GetRunID(),
includeDetails: c.EventDetails,
follow: true,
}
if err := iter.print(cctx.Printer); err != nil && cctx.Err() == nil {
return fmt.Errorf("displaying history failed: %w", err)
Expand Down Expand Up @@ -165,6 +166,9 @@ func printTextResult(
closeEvent *history.HistoryEvent,
duration time.Duration,
) error {
if closeEvent == nil {
return nil
}
cctx.Printer.Println(color.MagentaString("Results:"))
result := struct {
RunTime string `cli:",cardOmitEmpty"`
Expand Down Expand Up @@ -359,6 +363,10 @@ type structuredHistoryIter struct {
workflowID string
runID string
includeDetails bool
// If set true, long poll the history for updates
follow bool
// If and when the iterator encounters a workflow-terminating event, it will store it here
wfResult *history.HistoryEvent

// Internal
iter client.HistoryEventIterator
Expand Down Expand Up @@ -388,18 +396,13 @@ type structuredHistoryEvent struct {
var structuredHistoryEventType = reflect.TypeOf(structuredHistoryEvent{})

func (s *structuredHistoryIter) Next() (any, error) {
// Load iter
if s.iter == nil {
s.iter = s.client.GetWorkflowHistory(s.ctx, s.workflowID, s.runID, true, enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
}
if !s.iter.HasNext() {
return nil, nil
}
event, err := s.iter.Next()
event, err := s.NextRawEvent()
if err != nil {
return nil, err
}

if event == nil {
return nil, nil
}
// Build data
data := structuredHistoryEvent{
ID: event.EventId,
Expand All @@ -423,3 +426,33 @@ func (s *structuredHistoryIter) Next() (any, error) {
}
return data, nil
}

func (s *structuredHistoryIter) NextRawEvent() (*history.HistoryEvent, error) {
// Load iter
if s.iter == nil {
s.iter = s.client.GetWorkflowHistory(
s.ctx, s.workflowID, s.runID, s.follow, enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
}
if !s.iter.HasNext() {
return nil, nil
}
event, err := s.iter.Next()
if err != nil {
return nil, err
}
if isWorkflowTerminatingEvent(event.EventType) {
s.wfResult = event
}
return event, nil
}

func isWorkflowTerminatingEvent(t enums.EventType) bool {
switch t {
case enums.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED,
enums.EVENT_TYPE_WORKFLOW_EXECUTION_FAILED,
enums.EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT,
enums.EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED:
return true
}
return false
}
1 change: 0 additions & 1 deletion temporalcli/commands.workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,6 @@ func (s *SharedServerSuite) TestWorkflow_Query_SingleWorkflowSuccessJSON() {
}

func (s *SharedServerSuite) testQueryWorkflow(json bool) {
// Make workflow wait for signal and then return it
s.Worker.OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) {
err := workflow.SetQueryHandler(ctx, "my-query", func(arg string) (any, error) {
retme := struct {
Expand Down
46 changes: 44 additions & 2 deletions temporalcli/commands.workflow_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,48 @@ func (*TemporalWorkflowCountCommand) run(*CommandContext, []string) error {
return fmt.Errorf("TODO")
}

func (*TemporalWorkflowShowCommand) run(*CommandContext, []string) error {
return fmt.Errorf("TODO")
func (c *TemporalWorkflowShowCommand) run(cctx *CommandContext, args []string) error {
// Call describe
cl, err := c.Parent.ClientOptions.dialClient(cctx)
if err != nil {
return err
}
defer cl.Close()

// Print history
iter := &structuredHistoryIter{
ctx: cctx,
client: cl,
workflowID: c.WorkflowId,
runID: c.RunId,
includeDetails: true,
follow: c.Follow,
}
if !cctx.JSONOutput {
cctx.Printer.Println(color.MagentaString("Progress:"))
if err := iter.print(cctx.Printer); err != nil {
return fmt.Errorf("displaying history failed: %w", err)
}
if err := printTextResult(cctx, iter.wfResult, 0); err != nil {
return err
}
} else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is important to note that IIUC workflow show is the primary way people dump workflow history for use in replayers. So for JSON, we need to dump exact history format (i.e. create the history.History proto object with its events and send that to PrintStructured, which is close to what's here but has to be the full thing)

Copy link
Member Author

@Sushisource Sushisource Feb 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is exactly how the old CLI did it FYI (for the outer wrapper with 'events' field, but of course it used real HistoryEvent protos which is more important). But, making it be a literal history proto makes more sense anyway.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But, making it be a literal history proto makes more sense anyway.

They used that in printReplayableHistory which makes sense (it's simple code just for JSON), but for the non-JSON, yeah the lazy stuff for text is a bit more difficult. Feel free to steal anything from workflow execute which does similar.

events := make([]*history.HistoryEvent, 0)
for {
e, err := iter.NextRawEvent()
if err != nil {
return fmt.Errorf("failed getting next history event: %w", err)
}
if e == nil {
break
}
events = append(events, e)
}
outStruct := history.History{}
outStruct.Events = events
if err := cctx.Printer.PrintStructured(&outStruct, printer.StructuredOptions{}); err != nil {
return fmt.Errorf("failed printing structured output: %w", err)
}
}
return nil
}
146 changes: 146 additions & 0 deletions temporalcli/commands.workflow_view_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,149 @@ func (s *SharedServerSuite) TestWorkflow_Describe_Completed() {
s.NotNil(jsonOut["closeEvent"])
s.Equal(map[string]any{"foo": "bar"}, jsonOut["result"])
}

func (s *SharedServerSuite) TestWorkflow_Show_Follow() {
s.Worker.OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) {
sigs := 0
for {
workflow.GetSignalChannel(ctx, "my-signal").Receive(ctx, nil)
sigs += 1
if sigs == 2 {
break
}
}
return "hi!", nil
})

// Start the workflow
run, err := s.Client.ExecuteWorkflow(
s.Context,
client.StartWorkflowOptions{TaskQueue: s.Worker.Options.TaskQueue},
DevWorkflow,
"ignored",
)
s.NoError(err)

doneFollowingCh := make(chan struct{})
// Follow the workflow
go func() {
res := s.Execute(
"workflow", "show",
"--address", s.Address(),
"-w", run.GetID(),
"--follow",
)
s.NoError(res.Err)
out := res.Stdout.String()
s.Contains(out, "my-signal")
s.Contains(out, "Result \"hi!\"")
close(doneFollowingCh)
}()

// Send signals to complete
s.NoError(s.Client.SignalWorkflow(s.Context, run.GetID(), "", "my-signal", nil))
s.NoError(s.Client.SignalWorkflow(s.Context, run.GetID(), "", "my-signal", nil))

// Ensure following completes
<-doneFollowingCh
s.NoError(run.Get(s.Context, nil))
}

func (s *SharedServerSuite) TestWorkflow_Show_NoFollow() {
s.Worker.OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) {
sigs := 0
for {
workflow.GetSignalChannel(ctx, "my-signal").Receive(ctx, nil)
sigs += 1
if sigs == 2 {
break
}
}
return "hi!", nil
})

// Start the workflow
run, err := s.Client.ExecuteWorkflow(
s.Context,
client.StartWorkflowOptions{TaskQueue: s.Worker.Options.TaskQueue},
DevWorkflow,
"ignored",
)
s.NoError(err)

res := s.Execute(
"workflow", "show",
"--address", s.Address(),
"-w", run.GetID(),
)
s.NoError(res.Err)
out := res.Stdout.String()
s.NotContains(out, "my-signal")
s.NotContains(out, "Results:")

// Send signals to complete
s.NoError(s.Client.SignalWorkflow(s.Context, run.GetID(), "", "my-signal", nil))
s.NoError(s.Client.SignalWorkflow(s.Context, run.GetID(), "", "my-signal", nil))
s.NoError(run.Get(s.Context, nil))

res = s.Execute(
"workflow", "show",
"--address", s.Address(),
"-w", run.GetID(),
)
s.NoError(res.Err)
out = res.Stdout.String()
s.Contains(out, "my-signal")
s.Contains(out, "Result \"hi!\"")
}

func (s *SharedServerSuite) TestWorkflow_Show_JSON() {
s.Worker.OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) {
sigs := 0
for {
workflow.GetSignalChannel(ctx, "my-signal").Receive(ctx, nil)
sigs += 1
if sigs == 2 {
break
}
}
return "hi!", nil
})

// Start the workflow
run, err := s.Client.ExecuteWorkflow(
s.Context,
client.StartWorkflowOptions{TaskQueue: s.Worker.Options.TaskQueue},
DevWorkflow,
"ignored",
)
s.NoError(err)

res := s.Execute(
"workflow", "show",
"--address", s.Address(),
"-w", run.GetID(),
"-o", "json",
)
s.NoError(res.Err)
out := res.Stdout.String()
s.Contains(out, `"events": [`)
s.Contains(out, `"eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED"`)

// Send signals to complete
s.NoError(s.Client.SignalWorkflow(s.Context, run.GetID(), "", "my-signal", nil))
s.NoError(s.Client.SignalWorkflow(s.Context, run.GetID(), "", "my-signal", nil))
s.NoError(run.Get(s.Context, nil))

res = s.Execute(
"workflow", "show",
"--address", s.Address(),
"-w", run.GetID(),
"-o", "json",
)
s.NoError(res.Err)
out = res.Stdout.String()
s.Contains(out, `"events": [`)
s.Contains(out, `"signalName": "my-signal"`)
s.NotContains(out, "Results:")
}
4 changes: 2 additions & 2 deletions temporalcli/commandsmd/commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -438,8 +438,8 @@ Use the options listed below to change the command's behavior.

#### Options

* `--reset-points` (bool) - Only show auto-reset points.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like old CLI accepted this but never used it, 👍 removing

* `--follow` (bool) - Follow the progress of a Workflow Execution if it goes to a new run.
* `--follow`, `-f` (bool) - Follow the progress of a Workflow Execution in real time (does not apply
to JSON output).

Includes options set for [workflow reference](#options-set-for-workflow-reference).

Expand Down
Loading