Skip to content

Commit

Permalink
Revert "Fix errcheck in service/worker/ (temporalio#3731)"
Browse files Browse the repository at this point in the history
This reverts commit 1454f84.
  • Loading branch information
dnr committed Jan 7, 2023
1 parent 04ce8e3 commit 516a7b6
Showing 1 changed file with 26 additions and 62 deletions.
88 changes: 26 additions & 62 deletions service/worker/scheduler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,34 +188,21 @@ func (s *scheduler) run() error {
s.logger.Warn("Time went backwards", "from", t1, "to", t2)
t2 = t1
}
nextSleep, err := s.processTimeRange(
nextSleep := s.processTimeRange(
t1, t2,
// resolve this to the schedule's policy as late as possible
enumspb.SCHEDULE_OVERLAP_POLICY_UNSPECIFIED,
false,
)
if err != nil {
return err
}
s.State.LastProcessedTime = timestamp.TimePtr(t2)
// handle signals after processing time range that just elapsed
scheduleChanged := s.processSignals()
if scheduleChanged {
// need to calculate sleep again
nextSleep, err = s.processTimeRange(t2, t2, enumspb.SCHEDULE_OVERLAP_POLICY_UNSPECIFIED, false)
if err != nil {
return err
}
nextSleep = s.processTimeRange(t2, t2, enumspb.SCHEDULE_OVERLAP_POLICY_UNSPECIFIED, false)
}
// try starting workflows in the buffer
for {
b, err := s.processBuffer()
if err != nil {
return err
}
if !b {
break
}
for s.processBuffer() {
}
s.updateMemoAndSearchAttributes()
// sleep returns on any of:
Expand Down Expand Up @@ -327,11 +314,11 @@ func (s *scheduler) processTimeRange(
t1, t2 time.Time,
overlapPolicy enumspb.ScheduleOverlapPolicy,
manual bool,
) (time.Duration, error) {
) time.Duration {
s.logger.Debug("processTimeRange", "t1", t1, "t2", t2, "overlapPolicy", overlapPolicy, "manual", manual)

if s.cspec == nil {
return invalidDuration, nil
return invalidDuration
}

catchupWindow := s.getCatchupWindow()
Expand All @@ -340,16 +327,14 @@ func (s *scheduler) processTimeRange(
// Run this logic in a SideEffect so that we can fix bugs there without breaking
// existing schedule workflows.
var next getNextTimeResult
if err := workflow.SideEffect(s.ctx, func(ctx workflow.Context) interface{} {
workflow.SideEffect(s.ctx, func(ctx workflow.Context) interface{} {
return s.cspec.getNextTime(t1)
}).Get(&next); err != nil {
return 0, err
}
}).Get(&next)
t1 = next.Next
if t1.IsZero() {
return invalidDuration, nil
return invalidDuration
} else if t1.After(t2) {
return t1.Sub(t2), nil
return t1.Sub(t2)
}
if !manual && t2.Sub(t1) > catchupWindow {
s.logger.Warn("Schedule missed catchup window", "now", t2, "time", t1)
Expand Down Expand Up @@ -690,7 +675,7 @@ func (s *scheduler) addStart(nominalTime, actualTime time.Time, overlapPolicy en
}

// processBuffer should return true if there might be more work to do right now.
func (s *scheduler) processBuffer() (bool, error) {
func (s *scheduler) processBuffer() bool {
s.logger.Debug("processBuffer", "buffer", len(s.State.BufferedStarts), "running", len(s.Info.RunningWorkflows), "needRefresh", s.State.NeedRefresh)

// TODO: consider doing this always and removing needRefresh? we only end up here without
Expand All @@ -707,7 +692,7 @@ func (s *scheduler) processBuffer() (bool, error) {
req := s.Schedule.Action.GetStartWorkflow()
if req == nil || len(s.State.BufferedStarts) == 0 {
s.State.BufferedStarts = nil
return false, nil
return false
}

isRunning := len(s.Info.RunningWorkflows) > 0
Expand Down Expand Up @@ -744,15 +729,11 @@ func (s *scheduler) processBuffer() (bool, error) {
// Terminate or cancel if required (terminate overrides cancel if both are present)
if action.needTerminate {
for _, ex := range s.Info.RunningWorkflows {
if err := s.terminateWorkflow(ex); err != nil {
return false, err
}
s.terminateWorkflow(ex)
}
} else if action.needCancel {
for _, ex := range s.Info.RunningWorkflows {
if err := s.cancelWorkflow(ex); err != nil {
return false, err
}
s.cancelWorkflow(ex)
}
}

Expand All @@ -768,7 +749,7 @@ func (s *scheduler) processBuffer() (bool, error) {
}
}

return tryAgain, nil
return tryAgain
}

func (s *scheduler) recordAction(result *schedpb.ScheduleActionResult) {
Expand Down Expand Up @@ -807,10 +788,6 @@ func (s *scheduler) startWorkflow(
}
ctx := workflow.WithLocalActivityOptions(s.ctx, options)

requestID, err := s.newUUIDString()
if err != nil {
return nil, err
}
req := &schedspb.StartWorkflowRequest{
Request: &workflowservice.StartWorkflowExecutionRequest{
WorkflowId: workflowID,
Expand All @@ -821,7 +798,7 @@ func (s *scheduler) startWorkflow(
WorkflowRunTimeout: newWorkflow.WorkflowRunTimeout,
WorkflowTaskTimeout: newWorkflow.WorkflowTaskTimeout,
Identity: s.identity(),
RequestId: requestID,
RequestId: s.newUUIDString(),
WorkflowIdReusePolicy: enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
RetryPolicy: newWorkflow.RetryPolicy,
Memo: newWorkflow.Memo,
Expand All @@ -832,7 +809,7 @@ func (s *scheduler) startWorkflow(
ContinuedFailure: s.State.ContinuedFailure,
}
var res schedspb.StartWorkflowResponse
err = workflow.ExecuteLocalActivity(ctx, s.a.StartWorkflow, req).Get(s.ctx, &res)
err := workflow.ExecuteLocalActivity(ctx, s.a.StartWorkflow, req).Get(s.ctx, &res)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -908,60 +885,47 @@ func (s *scheduler) startLongPollWatcher(ex *commonpb.WorkflowExecution) {
s.watchingWorkflowId = ex.WorkflowId
}

func (s *scheduler) cancelWorkflow(ex *commonpb.WorkflowExecution) error {
func (s *scheduler) cancelWorkflow(ex *commonpb.WorkflowExecution) {
ctx := workflow.WithLocalActivityOptions(s.ctx, defaultLocalActivityOptions)
requestID, err := s.newUUIDString()
if err != nil {
return err
}
areq := &schedspb.CancelWorkflowRequest{
RequestId: requestID,
RequestId: s.newUUIDString(),
Identity: s.identity(),
Execution: ex,
Reason: "cancelled by schedule overlap policy",
}
err = workflow.ExecuteLocalActivity(ctx, s.a.CancelWorkflow, areq).Get(s.ctx, nil)
err := workflow.ExecuteLocalActivity(ctx, s.a.CancelWorkflow, areq).Get(s.ctx, nil)
if err != nil {
s.logger.Error("cancel workflow failed", "workflow", ex.WorkflowId, "error", err)
return err
}
// Note: the local activity has completed (or failed) here but the workflow might take time
// to close since a cancel is only a request.
return nil
}

func (s *scheduler) terminateWorkflow(ex *commonpb.WorkflowExecution) error {
func (s *scheduler) terminateWorkflow(ex *commonpb.WorkflowExecution) {
ctx := workflow.WithLocalActivityOptions(s.ctx, defaultLocalActivityOptions)
requestID, err := s.newUUIDString()
if err != nil {
return err
}
areq := &schedspb.TerminateWorkflowRequest{
RequestId: requestID,
RequestId: s.newUUIDString(),
Identity: s.identity(),
Execution: ex,
Reason: "terminated by schedule overlap policy",
}
err = workflow.ExecuteLocalActivity(ctx, s.a.TerminateWorkflow, areq).Get(s.ctx, nil)
err := workflow.ExecuteLocalActivity(ctx, s.a.TerminateWorkflow, areq).Get(s.ctx, nil)
if err != nil {
s.logger.Error("terminate workflow failed", "workflow", ex.WorkflowId, "error", err)
}
return err
}

func (s *scheduler) newUUIDString() (string, error) {
func (s *scheduler) newUUIDString() string {
if len(s.uuidBatch) == 0 {
if err := workflow.SideEffect(s.ctx, func(ctx workflow.Context) interface{} {
workflow.SideEffect(s.ctx, func(ctx workflow.Context) interface{} {
out := make([]string, 10)
for i := range out {
out[i] = uuid.NewString()
}
return out
}).Get(&s.uuidBatch); err != nil {
return "", err
}
}).Get(&s.uuidBatch)
}
next := s.uuidBatch[0]
s.uuidBatch = s.uuidBatch[1:]
return next, nil
return next
}

0 comments on commit 516a7b6

Please sign in to comment.