diff --git a/service/worker/scheduler/workflow.go b/service/worker/scheduler/workflow.go index 4cf1ddda3a0..81191937a38 100644 --- a/service/worker/scheduler/workflow.go +++ b/service/worker/scheduler/workflow.go @@ -188,34 +188,21 @@ func (s *scheduler) run() error { s.logger.Warn("Time went backwards", "from", t1, "to", t2) t2 = t1 } - nextSleep, err := s.processTimeRange( + nextSleep := s.processTimeRange( t1, t2, // resolve this to the schedule's policy as late as possible enumspb.SCHEDULE_OVERLAP_POLICY_UNSPECIFIED, false, ) - if err != nil { - return err - } s.State.LastProcessedTime = timestamp.TimePtr(t2) // handle signals after processing time range that just elapsed scheduleChanged := s.processSignals() if scheduleChanged { // need to calculate sleep again - nextSleep, err = s.processTimeRange(t2, t2, enumspb.SCHEDULE_OVERLAP_POLICY_UNSPECIFIED, false) - if err != nil { - return err - } + nextSleep = s.processTimeRange(t2, t2, enumspb.SCHEDULE_OVERLAP_POLICY_UNSPECIFIED, false) } // try starting workflows in the buffer - for { - b, err := s.processBuffer() - if err != nil { - return err - } - if !b { - break - } + for s.processBuffer() { } s.updateMemoAndSearchAttributes() // sleep returns on any of: @@ -327,11 +314,11 @@ func (s *scheduler) processTimeRange( t1, t2 time.Time, overlapPolicy enumspb.ScheduleOverlapPolicy, manual bool, -) (time.Duration, error) { +) time.Duration { s.logger.Debug("processTimeRange", "t1", t1, "t2", t2, "overlapPolicy", overlapPolicy, "manual", manual) if s.cspec == nil { - return invalidDuration, nil + return invalidDuration } catchupWindow := s.getCatchupWindow() @@ -340,16 +327,14 @@ func (s *scheduler) processTimeRange( // Run this logic in a SideEffect so that we can fix bugs there without breaking // existing schedule workflows. var next getNextTimeResult - if err := workflow.SideEffect(s.ctx, func(ctx workflow.Context) interface{} { + workflow.SideEffect(s.ctx, func(ctx workflow.Context) interface{} { return s.cspec.getNextTime(t1) - }).Get(&next); err != nil { - return 0, err - } + }).Get(&next) t1 = next.Next if t1.IsZero() { - return invalidDuration, nil + return invalidDuration } else if t1.After(t2) { - return t1.Sub(t2), nil + return t1.Sub(t2) } if !manual && t2.Sub(t1) > catchupWindow { s.logger.Warn("Schedule missed catchup window", "now", t2, "time", t1) @@ -690,7 +675,7 @@ func (s *scheduler) addStart(nominalTime, actualTime time.Time, overlapPolicy en } // processBuffer should return true if there might be more work to do right now. -func (s *scheduler) processBuffer() (bool, error) { +func (s *scheduler) processBuffer() bool { s.logger.Debug("processBuffer", "buffer", len(s.State.BufferedStarts), "running", len(s.Info.RunningWorkflows), "needRefresh", s.State.NeedRefresh) // TODO: consider doing this always and removing needRefresh? we only end up here without @@ -707,7 +692,7 @@ func (s *scheduler) processBuffer() (bool, error) { req := s.Schedule.Action.GetStartWorkflow() if req == nil || len(s.State.BufferedStarts) == 0 { s.State.BufferedStarts = nil - return false, nil + return false } isRunning := len(s.Info.RunningWorkflows) > 0 @@ -744,15 +729,11 @@ func (s *scheduler) processBuffer() (bool, error) { // Terminate or cancel if required (terminate overrides cancel if both are present) if action.needTerminate { for _, ex := range s.Info.RunningWorkflows { - if err := s.terminateWorkflow(ex); err != nil { - return false, err - } + s.terminateWorkflow(ex) } } else if action.needCancel { for _, ex := range s.Info.RunningWorkflows { - if err := s.cancelWorkflow(ex); err != nil { - return false, err - } + s.cancelWorkflow(ex) } } @@ -768,7 +749,7 @@ func (s *scheduler) processBuffer() (bool, error) { } } - return tryAgain, nil + return tryAgain } func (s *scheduler) recordAction(result *schedpb.ScheduleActionResult) { @@ -807,10 +788,6 @@ func (s *scheduler) startWorkflow( } ctx := workflow.WithLocalActivityOptions(s.ctx, options) - requestID, err := s.newUUIDString() - if err != nil { - return nil, err - } req := &schedspb.StartWorkflowRequest{ Request: &workflowservice.StartWorkflowExecutionRequest{ WorkflowId: workflowID, @@ -821,7 +798,7 @@ func (s *scheduler) startWorkflow( WorkflowRunTimeout: newWorkflow.WorkflowRunTimeout, WorkflowTaskTimeout: newWorkflow.WorkflowTaskTimeout, Identity: s.identity(), - RequestId: requestID, + RequestId: s.newUUIDString(), WorkflowIdReusePolicy: enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE, RetryPolicy: newWorkflow.RetryPolicy, Memo: newWorkflow.Memo, @@ -832,7 +809,7 @@ func (s *scheduler) startWorkflow( ContinuedFailure: s.State.ContinuedFailure, } var res schedspb.StartWorkflowResponse - err = workflow.ExecuteLocalActivity(ctx, s.a.StartWorkflow, req).Get(s.ctx, &res) + err := workflow.ExecuteLocalActivity(ctx, s.a.StartWorkflow, req).Get(s.ctx, &res) if err != nil { return nil, err } @@ -908,60 +885,47 @@ func (s *scheduler) startLongPollWatcher(ex *commonpb.WorkflowExecution) { s.watchingWorkflowId = ex.WorkflowId } -func (s *scheduler) cancelWorkflow(ex *commonpb.WorkflowExecution) error { +func (s *scheduler) cancelWorkflow(ex *commonpb.WorkflowExecution) { ctx := workflow.WithLocalActivityOptions(s.ctx, defaultLocalActivityOptions) - requestID, err := s.newUUIDString() - if err != nil { - return err - } areq := &schedspb.CancelWorkflowRequest{ - RequestId: requestID, + RequestId: s.newUUIDString(), Identity: s.identity(), Execution: ex, Reason: "cancelled by schedule overlap policy", } - err = workflow.ExecuteLocalActivity(ctx, s.a.CancelWorkflow, areq).Get(s.ctx, nil) + err := workflow.ExecuteLocalActivity(ctx, s.a.CancelWorkflow, areq).Get(s.ctx, nil) if err != nil { s.logger.Error("cancel workflow failed", "workflow", ex.WorkflowId, "error", err) - return err } // Note: the local activity has completed (or failed) here but the workflow might take time // to close since a cancel is only a request. - return nil } -func (s *scheduler) terminateWorkflow(ex *commonpb.WorkflowExecution) error { +func (s *scheduler) terminateWorkflow(ex *commonpb.WorkflowExecution) { ctx := workflow.WithLocalActivityOptions(s.ctx, defaultLocalActivityOptions) - requestID, err := s.newUUIDString() - if err != nil { - return err - } areq := &schedspb.TerminateWorkflowRequest{ - RequestId: requestID, + RequestId: s.newUUIDString(), Identity: s.identity(), Execution: ex, Reason: "terminated by schedule overlap policy", } - err = workflow.ExecuteLocalActivity(ctx, s.a.TerminateWorkflow, areq).Get(s.ctx, nil) + err := workflow.ExecuteLocalActivity(ctx, s.a.TerminateWorkflow, areq).Get(s.ctx, nil) if err != nil { s.logger.Error("terminate workflow failed", "workflow", ex.WorkflowId, "error", err) } - return err } -func (s *scheduler) newUUIDString() (string, error) { +func (s *scheduler) newUUIDString() string { if len(s.uuidBatch) == 0 { - if err := workflow.SideEffect(s.ctx, func(ctx workflow.Context) interface{} { + workflow.SideEffect(s.ctx, func(ctx workflow.Context) interface{} { out := make([]string, 10) for i := range out { out[i] = uuid.NewString() } return out - }).Get(&s.uuidBatch); err != nil { - return "", err - } + }).Get(&s.uuidBatch) } next := s.uuidBatch[0] s.uuidBatch = s.uuidBatch[1:] - return next, nil + return next }