From f8b89c8001c5cdb4111a6a28d3df74448e197580 Mon Sep 17 00:00:00 2001 From: Yuri Date: Tue, 4 Feb 2025 10:42:40 -0800 Subject: [PATCH 1/3] Add batch activity unpause. --- temporalcli/commands.activity.go | 79 ++++++-- temporalcli/commands.activity_test.go | 271 +++++++++++++++++--------- temporalcli/commands.gen.go | 29 ++- temporalcli/commandsgen/commands.yml | 55 +++++- 4 files changed, 309 insertions(+), 125 deletions(-) diff --git a/temporalcli/commands.activity.go b/temporalcli/commands.activity.go index 442a0a5b..b6356169 100644 --- a/temporalcli/commands.activity.go +++ b/temporalcli/commands.activity.go @@ -6,6 +6,7 @@ import ( "github.com/temporalio/cli/temporalcli/internal/printer" activitypb "go.temporal.io/api/activity/v1" + "go.temporal.io/api/batch/v1" "go.temporal.io/api/common/v1" "go.temporal.io/api/failure/v1" taskqueuepb "go.temporal.io/api/taskqueue/v1" @@ -230,28 +231,70 @@ func (c *TemporalActivityUnpauseCommand) run(cctx *CommandContext, args []string } defer cl.Close() - request := &workflowservice.UnpauseActivityRequest{ - Namespace: c.Parent.Namespace, - Execution: &common.WorkflowExecution{ - WorkflowId: c.WorkflowId, - RunId: c.RunId, - }, - ResetAttempts: c.ResetAttempts, - ResetHeartbeat: c.ResetHeartbeats, - Identity: c.Identity, + opts := SingleWorkflowOrBatchOptions{ + WorkflowId: c.WorkflowId, + RunId: c.RunId, + Query: c.Query, + Reason: c.Reason, + Yes: c.Yes, + Rps: c.Rps, } - if c.ActivityType != "" { - request.Activity = &workflowservice.UnpauseActivityRequest_Type{Type: c.ActivityType} - } else if c.ActivityId != "" { - request.Activity = &workflowservice.UnpauseActivityRequest_Id{Id: c.ActivityId} - } else { - return fmt.Errorf("either Activity Type or Activity Id must be provided") + exec, batchReq, err := opts.workflowExecOrBatch(cctx, c.Parent.Namespace, cl, singleOrBatchOverrides{ + // You're allowed to specify a reason when terminating a workflow + AllowReasonWithWorkflowID: true, + }) + if err != nil { + return err } - _, err = cl.WorkflowService().UnpauseActivity(cctx, request) - if err != nil { - return fmt.Errorf("unable to uppause an Activity: %w", err) + if exec != nil { // single workflow operation + request := &workflowservice.UnpauseActivityRequest{ + Namespace: c.Parent.Namespace, + Execution: &common.WorkflowExecution{ + WorkflowId: c.WorkflowId, + RunId: c.RunId, + }, + ResetAttempts: c.ResetAttempts, + ResetHeartbeat: c.ResetHeartbeats, + Jitter: durationpb.New(c.Jitter.Duration()), + Identity: c.Identity, + } + + if c.ActivityType != "" { + request.Activity = &workflowservice.UnpauseActivityRequest_Type{Type: c.ActivityType} + } else if c.ActivityId != "" { + request.Activity = &workflowservice.UnpauseActivityRequest_Id{Id: c.ActivityId} + } else { + return fmt.Errorf("either Activity Type or Activity Id must be provided") + } + + _, err = cl.WorkflowService().UnpauseActivity(cctx, request) + if err != nil { + return fmt.Errorf("unable to uppause an Activity: %w", err) + } + } else { // batch operation + unpauseActivitiesOperation := &batch.BatchOperationUnpauseActivities{ + Identity: clientIdentity(), + ResetAttempts: c.ResetAttempts, + ResetHeartbeat: c.ResetHeartbeats, + Jitter: durationpb.New(c.Jitter.Duration()), + } + if c.ActivityType != "" { + unpauseActivitiesOperation.Activity = &batch.BatchOperationUnpauseActivities_Type{Type: c.ActivityType} + } else if c.MatchAll == true { + unpauseActivitiesOperation.Activity = &batch.BatchOperationUnpauseActivities_MatchAll{MatchAll: true} + } else { + return fmt.Errorf("either Activity Type must be provided or MatchAll must be set to true") + } + + batchReq.Operation = &workflowservice.StartBatchOperationRequest_UnpauseActivitiesOperation{ + UnpauseActivitiesOperation: unpauseActivitiesOperation, + } + + if err := startBatchJob(cctx, cl, batchReq); err != nil { + return err + } } return nil diff --git a/temporalcli/commands.activity_test.go b/temporalcli/commands.activity_test.go index 7fbb8520..45ccb073 100644 --- a/temporalcli/commands.activity_test.go +++ b/temporalcli/commands.activity_test.go @@ -2,22 +2,33 @@ package temporalcli_test import ( "context" + "fmt" + "sync" + "sync/atomic" "time" "go.temporal.io/api/enums/v1" "go.temporal.io/api/history/v1" "go.temporal.io/api/serviceerror" + "go.temporal.io/api/workflowservice/v1" "go.temporal.io/sdk/client" + "go.temporal.io/sdk/temporal" + "go.temporal.io/sdk/workflow" + "google.golang.org/grpc" +) + +const ( + activityId string = "dev-activity-id" + activityType string = "DevActivity" + identity string = "MyIdentity" ) func (s *SharedServerSuite) TestActivity_Complete() { run := s.waitActivityStarted() wid := run.GetID() - aid := "dev-activity-id" - identity := "MyIdentity" res := s.Execute( "activity", "complete", - "--activity-id", aid, + "--activity-id", activityId, "--workflow-id", wid, "--result", "\"complete-activity-result\"", "--identity", identity, @@ -28,7 +39,7 @@ func (s *SharedServerSuite) TestActivity_Complete() { s.NoError(run.Get(s.Context, &actual)) s.Equal("complete-activity-result", actual) - started, completed, failed := s.getActivityEvents(wid, aid) + started, completed, failed := s.getActivityEvents(wid, activityId) s.NotNil(started) s.Nil(failed) s.NotNil(completed) @@ -39,13 +50,12 @@ func (s *SharedServerSuite) TestActivity_Complete() { func (s *SharedServerSuite) TestActivity_Fail() { run := s.waitActivityStarted() wid := run.GetID() - aid := "dev-activity-id" detail := "{\"myKey\": \"myValue\"}" reason := "MyReason" identity := "MyIdentity" res := s.Execute( "activity", "fail", - "--activity-id", aid, + "--activity-id", activityId, "--workflow-id", wid, "--run-id", run.GetRunID(), "--detail", detail, @@ -57,7 +67,7 @@ func (s *SharedServerSuite) TestActivity_Fail() { err := run.Get(s.Context, nil) s.NotNil(err) - started, completed, failed := s.getActivityEvents(wid, aid) + started, completed, failed := s.getActivityEvents(wid, activityId) s.NotNil(started) s.Nil(completed) s.NotNil(failed) @@ -71,18 +81,16 @@ func (s *SharedServerSuite) TestActivity_Fail() { func (s *SharedServerSuite) TestActivity_Complete_InvalidResult() { run := s.waitActivityStarted() - wid := run.GetID() - aid := "dev-activity-id" res := s.Execute( "activity", "complete", - "--activity-id", aid, - "--workflow-id", wid, + "--activity-id", activityId, + "--workflow-id", run.GetID(), "--result", "{not json}", "--address", s.Address(), ) s.ErrorContains(res.Err, "is not valid JSON") - started, completed, failed := s.getActivityEvents(wid, aid) + started, completed, failed := s.getActivityEvents(run.GetID(), activityId) s.Nil(started) s.Nil(completed) s.Nil(failed) @@ -91,17 +99,16 @@ func (s *SharedServerSuite) TestActivity_Complete_InvalidResult() { func (s *SharedServerSuite) TestActivity_Fail_InvalidDetail() { run := s.waitActivityStarted() wid := run.GetID() - aid := "dev-activity-id" res := s.Execute( "activity", "fail", - "--activity-id", aid, + "--activity-id", activityId, "--workflow-id", wid, "--detail", "{not json}", "--address", s.Address(), ) s.ErrorContains(res.Err, "is not valid JSON") - started, completed, failed := s.getActivityEvents(wid, aid) + started, completed, failed := s.getActivityEvents(wid, activityId) s.Nil(started) s.Nil(completed) s.Nil(failed) @@ -110,12 +117,10 @@ func (s *SharedServerSuite) TestActivity_Fail_InvalidDetail() { func (s *SharedServerSuite) TestActivityOptionsUpdate_Accept() { run := s.waitActivityStarted() wid := run.GetID() - aid := "dev-activity-id" - identity := "MyIdentity" res := s.Execute( "activity", "update-options", - "--activity-id", aid, + "--activity-id", activityId, "--workflow-id", wid, "--run-id", run.GetRunID(), "--identity", identity, @@ -145,14 +150,11 @@ func (s *SharedServerSuite) TestActivityOptionsUpdate_Accept() { func (s *SharedServerSuite) TestActivityOptionsUpdate_Partial() { run := s.waitActivityStarted() - wid := run.GetID() - aid := "dev-activity-id" - identity := "MyIdentity" res := s.Execute( "activity", "update-options", - "--activity-id", aid, - "--workflow-id", wid, + "--activity-id", activityId, + "--workflow-id", run.GetID(), "--run-id", run.GetRunID(), "--identity", identity, "--task-queue", "new-task-queue", @@ -182,114 +184,80 @@ func (s *SharedServerSuite) TestActivityOptionsUpdate_Partial() { s.ContainsOnSameLine(out, "BackoffCoefficient", "2") } -func (s *SharedServerSuite) TestActivityPauseUnpause() { - run := s.waitActivityStarted() - wid := run.GetID() - aid := "dev-activity-id" - identity := "MyIdentity" - - res := s.Execute( - "activity", "pause", - "--activity-id", aid, - "--workflow-id", wid, +func sendActivityCommand(command string, run client.WorkflowRun, s *SharedServerSuite, extraArgs ...string) *CommandResult { + args := []string{ + "activity", command, + "--workflow-id", run.GetID(), "--run-id", run.GetRunID(), "--identity", identity, "--address", s.Address(), - ) + } + + args = append(args, extraArgs...) + res := s.Execute(args...) + return res +} + +func (s *SharedServerSuite) TestActivityPauseUnpause() { + run := s.waitActivityStarted() + + res := sendActivityCommand("pause", run, s, "--activity-id", activityId) s.NoError(res.Err) - res = s.Execute( - "activity", "unpause", - "--activity-id", aid, - "--workflow-id", wid, - "--run-id", run.GetRunID(), - "--identity", identity, - "--address", s.Address(), - "--reset-attempts", - ) + s.Eventually(func() bool { + resp, err := s.Client.DescribeWorkflowExecution(s.Context, run.GetID(), run.GetRunID()) + s.NoError(err) + if resp.GetPendingActivities() == nil { + return false + } + return len(resp.PendingActivities) > 0 && resp.PendingActivities[0].Paused + }, 5*time.Second, 100*time.Millisecond) + res = sendActivityCommand("unpause", run, s, "--activity-id", activityId, "--reset-attempts") s.NoError(res.Err) + + s.Eventually(func() bool { + resp, err := s.Client.DescribeWorkflowExecution(s.Context, run.GetID(), run.GetRunID()) + s.NoError(err) + if resp.GetPendingActivities() == nil { + return false + } + return len(resp.PendingActivities) > 0 && !resp.PendingActivities[0].Paused + }, 5*time.Second, 100*time.Millisecond) } func (s *SharedServerSuite) TestActivityPauseUnpauseByType() { run := s.waitActivityStarted() - wid := run.GetID() - at := "DevActivity" - identity := "MyIdentity" - - res := s.Execute( - "activity", "pause", - "--activity-type", at, - "--workflow-id", wid, - "--run-id", run.GetRunID(), - "--identity", identity, - "--address", s.Address(), - ) + res := sendActivityCommand("pause", run, s, "--activity-type", activityType) s.NoError(res.Err) - res = s.Execute( - "activity", "unpause", - "--activity-type", at, - "--workflow-id", wid, - "--run-id", run.GetRunID(), - "--identity", identity, - "--address", s.Address(), - "--reset-attempts", - ) - + res = sendActivityCommand("unpause", run, s, "--activity-type", activityType, "--reset-attempts") s.NoError(res.Err) } -func (s *SharedServerSuite) TestActivityCommandFailed_NoActivityTpeOrid() { +func (s *SharedServerSuite) TestActivityCommandFailed_NoActivityTpeOrId() { run := s.waitActivityStarted() - wid := run.GetID() - identity := "MyIdentity" commands := []string{"pause", "unpause", "reset"} for _, command := range commands { // should fail because both activity-id and activity-type are not provided - res := s.Execute( - "activity", command, - "--workflow-id", wid, - "--run-id", run.GetRunID(), - "--identity", identity, - "--address", s.Address(), - ) + res := sendActivityCommand(command, run, s) s.Error(res.Err) } } func (s *SharedServerSuite) TestActivityReset() { run := s.waitActivityStarted() - wid := run.GetID() - aid := "dev-activity-id" - identity := "MyIdentity" - - res := s.Execute( - "activity", "reset", - "--activity-id", aid, - "--workflow-id", wid, - "--run-id", run.GetRunID(), - "--identity", identity, - "--address", s.Address(), - ) + res := sendActivityCommand("reset", run, s, "--activity-id", activityId) s.NoError(res.Err) // make sure we receive a server response out := res.Stdout.String() s.ContainsOnSameLine(out, "ServerResponse", "true") // reset should fail because activity is not found - res = s.Execute( - "activity", "reset", - "--activity-id", "fake_id", - "--workflow-id", wid, - "--run-id", run.GetRunID(), - "--identity", identity, - "--address", s.Address(), - ) - + res = sendActivityCommand("reset", run, s, "--activity-id", "fake_id") s.Error(res.Err) // make sure we receive a NotFound error from the server` var notFound *serviceerror.NotFound @@ -318,6 +286,22 @@ func (s *SharedServerSuite) waitActivityStarted() client.WorkflowRun { return run } +func waitWorkflowStarted(s *SharedServerSuite) client.WorkflowRun { + run, err := s.Client.ExecuteWorkflow( + s.Context, + client.StartWorkflowOptions{TaskQueue: s.Worker().Options.TaskQueue}, + DevWorkflow, + "ignored", + ) + s.NoError(err) + s.Eventually(func() bool { + resp, err := s.Client.DescribeWorkflowExecution(s.Context, run.GetID(), run.GetRunID()) + s.NoError(err) + return len(resp.PendingActivities) > 0 + }, 5*time.Second, 100*time.Millisecond) + return run +} + func (s *SharedServerSuite) getActivityEvents(workflowID, activityID string) ( started *history.ActivityTaskStartedEventAttributes, completed *history.ActivityTaskCompletedEventAttributes, @@ -338,3 +322,98 @@ func (s *SharedServerSuite) getActivityEvents(workflowID, activityID string) ( } return started, completed, failed } + +func (s *SharedServerSuite) TestUnpauseActivity_BatchSuccess() { + var failActivity atomic.Bool + failActivity.Store(true) + s.Worker().OnDevActivity(func(ctx context.Context, a any) (any, error) { + time.Sleep(100 * time.Millisecond) + if failActivity.Load() { + return nil, fmt.Errorf("update workflow received non-float input") + } + return nil, nil + }) + + s.Worker().OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) { + // override the activity options to allow activity to constantly fail + ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + ActivityID: activityId, + StartToCloseTimeout: 1 * time.Minute, + RetryPolicy: &temporal.RetryPolicy{ + MaximumAttempts: 0, + }, + }) + var res any + err := workflow.ExecuteActivity(ctx, DevActivity).Get(ctx, &res) + return res, err + }) + + run1 := waitWorkflowStarted(s) + run2 := waitWorkflowStarted(s) + + // Wait for all to appear in list + query := fmt.Sprintf("WorkflowId = '%s' OR WorkflowId = '%s'", run1.GetID(), run2.GetID()) + s.Eventually(func() bool { + resp, err := s.Client.ListWorkflow(s.Context, &workflowservice.ListWorkflowExecutionsRequest{ + Query: query, + }) + s.NoError(err) + return len(resp.Executions) == 2 + }, 3*time.Second, 100*time.Millisecond) + + // Pause the activities + res := sendActivityCommand("pause", run1, s, "--activity-id", activityId) + s.NoError(res.Err) + res = sendActivityCommand("pause", run2, s, "--activity-id", activityId) + s.NoError(res.Err) + + // wait for activities to be paused + s.Eventually(func() bool { + resp, err := s.Client.DescribeWorkflowExecution(s.Context, run1.GetID(), run1.GetRunID()) + s.NoError(err) + if resp.GetPendingActivities() == nil { + return false + } + return len(resp.PendingActivities) > 0 && resp.PendingActivities[0].Paused + }, 5*time.Second, 100*time.Millisecond) + + s.Eventually(func() bool { + resp, err := s.Client.DescribeWorkflowExecution(s.Context, run2.GetID(), run2.GetRunID()) + s.NoError(err) + if resp.GetPendingActivities() == nil { + return false + } + return len(resp.PendingActivities) > 0 && resp.PendingActivities[0].Paused + }, 5*time.Second, 100*time.Millisecond) + + var lastRequestLock sync.Mutex + var startBatchRequest *workflowservice.StartBatchOperationRequest + s.CommandHarness.Options.AdditionalClientGRPCDialOptions = append( + s.CommandHarness.Options.AdditionalClientGRPCDialOptions, + grpc.WithChainUnaryInterceptor(func( + ctx context.Context, + method string, req, reply any, + cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption, + ) error { + lastRequestLock.Lock() + if r, ok := req.(*workflowservice.StartBatchOperationRequest); ok { + startBatchRequest = r + } + lastRequestLock.Unlock() + return invoker(ctx, method, req, reply, cc, opts...) + }), + ) + + failActivity.Store(false) + + // Send batch activity unpause + cmdRes := s.Execute("activity", "unpause", + "--rps", "1", + "--address", s.Address(), + "--query", query, + "--reason", "unpause-test", + "--yes", "--match-all", + ) + s.NoError(cmdRes.Err) + s.NotEmpty(startBatchRequest.JobId) +} diff --git a/temporalcli/commands.gen.go b/temporalcli/commands.gen.go index 3800b5a1..5c7856b6 100644 --- a/temporalcli/commands.gen.go +++ b/temporalcli/commands.gen.go @@ -486,14 +486,21 @@ func NewTemporalActivityResetCommand(cctx *CommandContext, parent *TemporalActiv } type TemporalActivityUnpauseCommand struct { - Parent *TemporalActivityCommand - Command cobra.Command - WorkflowReferenceOptions + Parent *TemporalActivityCommand + Command cobra.Command + WorkflowId string + RunId string ActivityId string ActivityType string Identity string ResetAttempts bool ResetHeartbeats bool + Query string + Reason string + MatchAll bool + Jitter Duration + Yes bool + Rps float32 } func NewTemporalActivityUnpauseCommand(cctx *CommandContext, parent *TemporalActivityCommand) *TemporalActivityUnpauseCommand { @@ -503,17 +510,25 @@ func NewTemporalActivityUnpauseCommand(cctx *CommandContext, parent *TemporalAct s.Command.Use = "unpause [flags]" s.Command.Short = "Unpause an Activity" if hasHighlighting { - s.Command.Long = "Re-schedule a previously-paused Activity for execution.\n\nIf the Activity is not running and is past its retry timeout, it will be\nscheduled immediately. Otherwise, it will be scheduled after its retry\ntimeout expires. \n\nUse \x1b[1m--reset-attempts\x1b[0m to reset the number of previous run attempts to \nzero. For example, if an Activity is near the maximum number of attempts \nN specified in its retry policy, \x1b[1m--reset-attempts\x1b[0m will allow the \nActivity to be retried another N times after unpausing.\n\nUse \x1b[1m--reset-heartbeat\x1b[0m to reset the Activity's heartbeats. \n\nActivities can be specified by their Activity ID or Activity Type. \nOne of those parameters must be provided. If both are provided - Activity\nType will be used, and Activity ID will be ignored. \n\nSpecify the Activity ID or Type and Workflow IDs:\n\n\x1b[1mtemporal activity unpause \\\n --activity-id YourActivityId \\\n --workflow-id YourWorkflowId\n --reset-attempts\n --reset-heartbeats\x1b[0m" + s.Command.Long = "Re-schedule a previously-paused Activity for execution.\n\nIf the Activity is not running and is past its retry timeout, it will be\nscheduled immediately. Otherwise, it will be scheduled after its retry\ntimeout expires. \n\nUse \x1b[1m--reset-attempts\x1b[0m to reset the number of previous run attempts to \nzero. For example, if an Activity is near the maximum number of attempts \nN specified in its retry policy, \x1b[1m--reset-attempts\x1b[0m will allow the \nActivity to be retried another N times after unpausing.\n\nUse \x1b[1m--reset-heartbeat\x1b[0m to reset the Activity's heartbeats. \n\nActivities can be specified by their Activity ID or Activity Type. \nOne of those parameters must be provided. If both are provided - Activity\nType will be used, and Activity ID will be ignored.\n\nActivities can be unpaused in bulk via a visibility Query list filter:\n\n\x1b[1mtemporal activity unpause \\\n --query YourQuery \\\n --reason YourReasonForTermination\x1b[0m\n\n\nSpecify the Activity ID or Type and Workflow IDs:\n\n\x1b[1mtemporal activity unpause \\\n --activity-id YourActivityId \\\n --workflow-id YourWorkflowId\n --reset-attempts\n --reset-heartbeats\x1b[0m" } else { - s.Command.Long = "Re-schedule a previously-paused Activity for execution.\n\nIf the Activity is not running and is past its retry timeout, it will be\nscheduled immediately. Otherwise, it will be scheduled after its retry\ntimeout expires. \n\nUse `--reset-attempts` to reset the number of previous run attempts to \nzero. For example, if an Activity is near the maximum number of attempts \nN specified in its retry policy, `--reset-attempts` will allow the \nActivity to be retried another N times after unpausing.\n\nUse `--reset-heartbeat` to reset the Activity's heartbeats. \n\nActivities can be specified by their Activity ID or Activity Type. \nOne of those parameters must be provided. If both are provided - Activity\nType will be used, and Activity ID will be ignored. \n\nSpecify the Activity ID or Type and Workflow IDs:\n\n```\ntemporal activity unpause \\\n --activity-id YourActivityId \\\n --workflow-id YourWorkflowId\n --reset-attempts\n --reset-heartbeats\n```" + s.Command.Long = "Re-schedule a previously-paused Activity for execution.\n\nIf the Activity is not running and is past its retry timeout, it will be\nscheduled immediately. Otherwise, it will be scheduled after its retry\ntimeout expires. \n\nUse `--reset-attempts` to reset the number of previous run attempts to \nzero. For example, if an Activity is near the maximum number of attempts \nN specified in its retry policy, `--reset-attempts` will allow the \nActivity to be retried another N times after unpausing.\n\nUse `--reset-heartbeat` to reset the Activity's heartbeats. \n\nActivities can be specified by their Activity ID or Activity Type. \nOne of those parameters must be provided. If both are provided - Activity\nType will be used, and Activity ID will be ignored.\n\nActivities can be unpaused in bulk via a visibility Query list filter:\n\n```\ntemporal activity unpause \\\n --query YourQuery \\\n --reason YourReasonForTermination\n```\n\n\nSpecify the Activity ID or Type and Workflow IDs:\n\n```\ntemporal activity unpause \\\n --activity-id YourActivityId \\\n --workflow-id YourWorkflowId\n --reset-attempts\n --reset-heartbeats\n```" } s.Command.Args = cobra.NoArgs - s.Command.Flags().StringVarP(&s.ActivityId, "activity-id", "a", "", "Activity ID to unpause.") + s.Command.Flags().StringVarP(&s.WorkflowId, "workflow-id", "w", "", "Workflow ID. You must set either --workflow-id or --query.") + s.Command.Flags().StringVarP(&s.RunId, "run-id", "r", "", "Run ID. Can only be set with --workflow-id. Do not use with --query.") + s.Command.Flags().StringVarP(&s.ActivityId, "activity-id", "a", "", "Activity ID to unpause. Can only be used without --query.") s.Command.Flags().StringVarP(&s.ActivityType, "activity-type", "g", "", "Activity Type to unpause.") s.Command.Flags().StringVar(&s.Identity, "identity", "", "Identity of the user submitting this request.") s.Command.Flags().BoolVar(&s.ResetAttempts, "reset-attempts", false, "Also reset the activity attempts.") s.Command.Flags().BoolVar(&s.ResetHeartbeats, "reset-heartbeats", false, "Reset the Activity's heartbeats. Only works with --reset-attempts.") - s.WorkflowReferenceOptions.buildFlags(cctx, s.Command.Flags()) + s.Command.Flags().StringVarP(&s.Query, "query", "q", "", "Content for an SQL-like `QUERY` List Filter.") + s.Command.Flags().StringVar(&s.Reason, "reason", "", "Reason for unpause. Can only be used with --query.") + s.Command.Flags().BoolVar(&s.MatchAll, "match-all", false, "Every paused activity should be unpaused. Can only be used with --query.") + s.Jitter = 0 + s.Command.Flags().VarP(&s.Jitter, "jitter", "j", "The activity will start at random a time within the specified duration. Can only be used with --query.") + s.Command.Flags().BoolVarP(&s.Yes, "yes", "y", false, "Don't prompt to confirm termination. Can only be used with --query.") + s.Command.Flags().Float32Var(&s.Rps, "rps", 0, "Limit batch's requests per second. Can only be used with --query.") s.Command.Run = func(c *cobra.Command, args []string) { if err := s.run(cctx, args); err != nil { cctx.Options.Fail(err) diff --git a/temporalcli/commandsgen/commands.yml b/temporalcli/commandsgen/commands.yml index dadec744..a70cbdea 100644 --- a/temporalcli/commandsgen/commands.yml +++ b/temporalcli/commandsgen/commands.yml @@ -437,7 +437,16 @@ commands: Activities can be specified by their Activity ID or Activity Type. One of those parameters must be provided. If both are provided - Activity - Type will be used, and Activity ID will be ignored. + Type will be used, and Activity ID will be ignored. + + Activities can be unpaused in bulk via a visibility Query list filter: + + ``` + temporal activity unpause \ + --query YourQuery \ + --reason YourReasonForTermination + ``` + Specify the Activity ID or Type and Workflow IDs: @@ -449,10 +458,21 @@ commands: --reset-heartbeats ``` options: + - name: workflow-id + short: w + type: string + description: | + Workflow ID. You must set either --workflow-id or --query. + - name: run-id + short: r + type: string + description: | + Run ID. Can only be set with --workflow-id. Do not use with --query. - name: activity-id short: a type: string - description: Activity ID to unpause. + description: | + Activity ID to unpause. Can only be used without --query. - name: activity-type short: g type: string @@ -467,8 +487,35 @@ commands: type: bool description: | Reset the Activity's heartbeats. Only works with --reset-attempts. - option-sets: - - workflow reference + - name: query + short: q + type: string + description: | + Content for an SQL-like `QUERY` List Filter. + - name: reason + type: string + description: | + Reason for unpause. Can only be used with --query. + - name: match-all + type: bool + description: | + Every paused activity should be unpaused. + Can only be used with --query. + - name: jitter + type: duration + short: j + description: | + The activity will start at random a time within the specified duration. + Can only be used with --query. + - name: yes + short: y + type: bool + description: | + Don't prompt to confirm termination. Can only be used with --query. + - name: rps + type: float + description: | + Limit batch's requests per second. Can only be used with --query. - name: temporal activity reset summary: Reset an Activity From d9d766c59f4710e475a8e6d06a775ca0b9e3ffc7 Mon Sep 17 00:00:00 2001 From: Yuri Date: Wed, 5 Feb 2025 09:19:34 -0800 Subject: [PATCH 2/3] work on comments --- temporalcli/commands.activity_test.go | 5 ++-- temporalcli/commands.gen.go | 20 ++++----------- temporalcli/commandsgen/commands.yml | 35 ++++----------------------- 3 files changed, 13 insertions(+), 47 deletions(-) diff --git a/temporalcli/commands.activity_test.go b/temporalcli/commands.activity_test.go index 45ccb073..a99495ba 100644 --- a/temporalcli/commands.activity_test.go +++ b/temporalcli/commands.activity_test.go @@ -404,8 +404,6 @@ func (s *SharedServerSuite) TestUnpauseActivity_BatchSuccess() { }), ) - failActivity.Store(false) - // Send batch activity unpause cmdRes := s.Execute("activity", "unpause", "--rps", "1", @@ -416,4 +414,7 @@ func (s *SharedServerSuite) TestUnpauseActivity_BatchSuccess() { ) s.NoError(cmdRes.Err) s.NotEmpty(startBatchRequest.JobId) + + // unblock the activities + failActivity.Store(false) } diff --git a/temporalcli/commands.gen.go b/temporalcli/commands.gen.go index 5c7856b6..a73a480c 100644 --- a/temporalcli/commands.gen.go +++ b/temporalcli/commands.gen.go @@ -486,21 +486,16 @@ func NewTemporalActivityResetCommand(cctx *CommandContext, parent *TemporalActiv } type TemporalActivityUnpauseCommand struct { - Parent *TemporalActivityCommand - Command cobra.Command - WorkflowId string - RunId string + Parent *TemporalActivityCommand + Command cobra.Command + SingleWorkflowOrBatchOptions ActivityId string ActivityType string Identity string ResetAttempts bool ResetHeartbeats bool - Query string - Reason string MatchAll bool Jitter Duration - Yes bool - Rps float32 } func NewTemporalActivityUnpauseCommand(cctx *CommandContext, parent *TemporalActivityCommand) *TemporalActivityUnpauseCommand { @@ -515,20 +510,15 @@ func NewTemporalActivityUnpauseCommand(cctx *CommandContext, parent *TemporalAct s.Command.Long = "Re-schedule a previously-paused Activity for execution.\n\nIf the Activity is not running and is past its retry timeout, it will be\nscheduled immediately. Otherwise, it will be scheduled after its retry\ntimeout expires. \n\nUse `--reset-attempts` to reset the number of previous run attempts to \nzero. For example, if an Activity is near the maximum number of attempts \nN specified in its retry policy, `--reset-attempts` will allow the \nActivity to be retried another N times after unpausing.\n\nUse `--reset-heartbeat` to reset the Activity's heartbeats. \n\nActivities can be specified by their Activity ID or Activity Type. \nOne of those parameters must be provided. If both are provided - Activity\nType will be used, and Activity ID will be ignored.\n\nActivities can be unpaused in bulk via a visibility Query list filter:\n\n```\ntemporal activity unpause \\\n --query YourQuery \\\n --reason YourReasonForTermination\n```\n\n\nSpecify the Activity ID or Type and Workflow IDs:\n\n```\ntemporal activity unpause \\\n --activity-id YourActivityId \\\n --workflow-id YourWorkflowId\n --reset-attempts\n --reset-heartbeats\n```" } s.Command.Args = cobra.NoArgs - s.Command.Flags().StringVarP(&s.WorkflowId, "workflow-id", "w", "", "Workflow ID. You must set either --workflow-id or --query.") - s.Command.Flags().StringVarP(&s.RunId, "run-id", "r", "", "Run ID. Can only be set with --workflow-id. Do not use with --query.") s.Command.Flags().StringVarP(&s.ActivityId, "activity-id", "a", "", "Activity ID to unpause. Can only be used without --query.") s.Command.Flags().StringVarP(&s.ActivityType, "activity-type", "g", "", "Activity Type to unpause.") s.Command.Flags().StringVar(&s.Identity, "identity", "", "Identity of the user submitting this request.") s.Command.Flags().BoolVar(&s.ResetAttempts, "reset-attempts", false, "Also reset the activity attempts.") s.Command.Flags().BoolVar(&s.ResetHeartbeats, "reset-heartbeats", false, "Reset the Activity's heartbeats. Only works with --reset-attempts.") - s.Command.Flags().StringVarP(&s.Query, "query", "q", "", "Content for an SQL-like `QUERY` List Filter.") - s.Command.Flags().StringVar(&s.Reason, "reason", "", "Reason for unpause. Can only be used with --query.") - s.Command.Flags().BoolVar(&s.MatchAll, "match-all", false, "Every paused activity should be unpaused. Can only be used with --query.") + s.Command.Flags().BoolVar(&s.MatchAll, "match-all", false, "Every paused activity should be unpaused. This flag is ignored if activity-type is provided. Can only be used with --query.") s.Jitter = 0 s.Command.Flags().VarP(&s.Jitter, "jitter", "j", "The activity will start at random a time within the specified duration. Can only be used with --query.") - s.Command.Flags().BoolVarP(&s.Yes, "yes", "y", false, "Don't prompt to confirm termination. Can only be used with --query.") - s.Command.Flags().Float32Var(&s.Rps, "rps", 0, "Limit batch's requests per second. Can only be used with --query.") + s.SingleWorkflowOrBatchOptions.buildFlags(cctx, s.Command.Flags()) s.Command.Run = func(c *cobra.Command, args []string) { if err := s.run(cctx, args); err != nil { cctx.Options.Fail(err) diff --git a/temporalcli/commandsgen/commands.yml b/temporalcli/commandsgen/commands.yml index a70cbdea..67d2d3af 100644 --- a/temporalcli/commandsgen/commands.yml +++ b/temporalcli/commandsgen/commands.yml @@ -458,16 +458,6 @@ commands: --reset-heartbeats ``` options: - - name: workflow-id - short: w - type: string - description: | - Workflow ID. You must set either --workflow-id or --query. - - name: run-id - short: r - type: string - description: | - Run ID. Can only be set with --workflow-id. Do not use with --query. - name: activity-id short: a type: string @@ -487,35 +477,20 @@ commands: type: bool description: | Reset the Activity's heartbeats. Only works with --reset-attempts. - - name: query - short: q - type: string - description: | - Content for an SQL-like `QUERY` List Filter. - - name: reason - type: string - description: | - Reason for unpause. Can only be used with --query. - name: match-all type: bool description: | - Every paused activity should be unpaused. - Can only be used with --query. + Every paused activity should be unpaused. This flag is ignored if + activity-type is provided. Can only be used with --query. - name: jitter type: duration short: j description: | The activity will start at random a time within the specified duration. Can only be used with --query. - - name: yes - short: y - type: bool - description: | - Don't prompt to confirm termination. Can only be used with --query. - - name: rps - type: float - description: | - Limit batch's requests per second. Can only be used with --query. + option-sets: + - single-workflow-or-batch + - name: temporal activity reset summary: Reset an Activity From 5111d03b4819055f8a088320bd70eb40d89ecb74 Mon Sep 17 00:00:00 2001 From: Yuri Date: Wed, 5 Feb 2025 10:53:59 -0800 Subject: [PATCH 3/3] work on comments --- temporalcli/commands.activity_test.go | 44 ++++++++++++++++----------- 1 file changed, 26 insertions(+), 18 deletions(-) diff --git a/temporalcli/commands.activity_test.go b/temporalcli/commands.activity_test.go index a99495ba..2c258d19 100644 --- a/temporalcli/commands.activity_test.go +++ b/temporalcli/commands.activity_test.go @@ -323,6 +323,25 @@ func (s *SharedServerSuite) getActivityEvents(workflowID, activityID string) ( return started, completed, failed } +func checkActivitiesRunning(s *SharedServerSuite, run client.WorkflowRun) { + s.Eventually(func() bool { + resp, err := s.Client.DescribeWorkflowExecution(s.Context, run.GetID(), run.GetRunID()) + s.NoError(err) + return len(resp.GetPendingActivities()) > 0 + }, 5*time.Second, 200*time.Millisecond) +} + +func checkActivitiesPaused(s *SharedServerSuite, run client.WorkflowRun) { + s.Eventually(func() bool { + resp, err := s.Client.DescribeWorkflowExecution(s.Context, run.GetID(), run.GetRunID()) + s.NoError(err) + if resp.GetPendingActivities() == nil { + return false + } + return len(resp.GetPendingActivities()) > 0 && resp.GetPendingActivities()[0].Paused + }, 5*time.Second, 200*time.Millisecond) +} + func (s *SharedServerSuite) TestUnpauseActivity_BatchSuccess() { var failActivity atomic.Bool failActivity.Store(true) @@ -368,23 +387,8 @@ func (s *SharedServerSuite) TestUnpauseActivity_BatchSuccess() { s.NoError(res.Err) // wait for activities to be paused - s.Eventually(func() bool { - resp, err := s.Client.DescribeWorkflowExecution(s.Context, run1.GetID(), run1.GetRunID()) - s.NoError(err) - if resp.GetPendingActivities() == nil { - return false - } - return len(resp.PendingActivities) > 0 && resp.PendingActivities[0].Paused - }, 5*time.Second, 100*time.Millisecond) - - s.Eventually(func() bool { - resp, err := s.Client.DescribeWorkflowExecution(s.Context, run2.GetID(), run2.GetRunID()) - s.NoError(err) - if resp.GetPendingActivities() == nil { - return false - } - return len(resp.PendingActivities) > 0 && resp.PendingActivities[0].Paused - }, 5*time.Second, 100*time.Millisecond) + checkActivitiesPaused(s, run1) + checkActivitiesPaused(s, run2) var lastRequestLock sync.Mutex var startBatchRequest *workflowservice.StartBatchOperationRequest @@ -415,6 +419,10 @@ func (s *SharedServerSuite) TestUnpauseActivity_BatchSuccess() { s.NoError(cmdRes.Err) s.NotEmpty(startBatchRequest.JobId) - // unblock the activities + // check activities are running + checkActivitiesRunning(s, run1) + checkActivitiesRunning(s, run2) + + // unblock the activities to let them finish failActivity.Store(false) }