Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix errcheck in service/worker/ #3731

Merged
merged 1 commit into from
Dec 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions service/worker/pernamespaceworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,10 @@ func (wm *perNamespaceWorkerManager) Start(
// this will call namespaceCallback with current namespaces
wm.namespaceRegistry.RegisterStateChangeCallback(wm, wm.namespaceCallback)

wm.serviceResolver.AddListener(perNamespaceWorkerManagerListenerKey, wm.membershipChangedCh)
err := wm.serviceResolver.AddListener(perNamespaceWorkerManagerListenerKey, wm.membershipChangedCh)
if err != nil {
wm.logger.Fatal("Unable to register membership listener", tag.Error(err))
}
go wm.membershipChangedListener()

wm.logger.Info("", tag.LifeCycleStarted)
Expand All @@ -169,7 +172,10 @@ func (wm *perNamespaceWorkerManager) Stop() {
wm.logger.Info("", tag.LifeCycleStopping)

wm.namespaceRegistry.UnregisterStateChangeCallback(wm)
wm.serviceResolver.RemoveListener(perNamespaceWorkerManagerListenerKey)
err := wm.serviceResolver.RemoveListener(perNamespaceWorkerManagerListenerKey)
if err != nil {
wm.logger.Error("Unable to unregister membership listener", tag.Error(err))
}
close(wm.membershipChangedCh)

wm.lock.Lock()
Expand Down
88 changes: 62 additions & 26 deletions service/worker/scheduler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,21 +188,34 @@ func (s *scheduler) run() error {
s.logger.Warn("Time went backwards", "from", t1, "to", t2)
t2 = t1
}
nextSleep := s.processTimeRange(
nextSleep, err := s.processTimeRange(
t1, t2,
// resolve this to the schedule's policy as late as possible
enumspb.SCHEDULE_OVERLAP_POLICY_UNSPECIFIED,
false,
)
if err != nil {
return err
}
s.State.LastProcessedTime = timestamp.TimePtr(t2)
// handle signals after processing time range that just elapsed
scheduleChanged := s.processSignals()
if scheduleChanged {
// need to calculate sleep again
nextSleep = s.processTimeRange(t2, t2, enumspb.SCHEDULE_OVERLAP_POLICY_UNSPECIFIED, false)
nextSleep, err = s.processTimeRange(t2, t2, enumspb.SCHEDULE_OVERLAP_POLICY_UNSPECIFIED, false)
if err != nil {
return err
}
}
// try starting workflows in the buffer
for s.processBuffer() {
for {
b, err := s.processBuffer()
if err != nil {
return err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I appreciate the effort but this is wrong: returning an error here will end the workflow. Errors from terminate or cancel are supposed to be swallowed since they'll be retried at the next action attempt.

And most of the other changes in this file are just unnecessary clutter for type errors that can't ever happen.

Can we revert at least the changes to this file?

}
if !b {
break
}
}
s.updateMemoAndSearchAttributes()
// sleep returns on any of:
Expand Down Expand Up @@ -314,11 +327,11 @@ func (s *scheduler) processTimeRange(
t1, t2 time.Time,
overlapPolicy enumspb.ScheduleOverlapPolicy,
manual bool,
) time.Duration {
) (time.Duration, error) {
s.logger.Debug("processTimeRange", "t1", t1, "t2", t2, "overlapPolicy", overlapPolicy, "manual", manual)

if s.cspec == nil {
return invalidDuration
return invalidDuration, nil
}

catchupWindow := s.getCatchupWindow()
Expand All @@ -327,14 +340,16 @@ func (s *scheduler) processTimeRange(
// Run this logic in a SideEffect so that we can fix bugs there without breaking
// existing schedule workflows.
var next getNextTimeResult
workflow.SideEffect(s.ctx, func(ctx workflow.Context) interface{} {
if err := workflow.SideEffect(s.ctx, func(ctx workflow.Context) interface{} {
return s.cspec.getNextTime(t1)
}).Get(&next)
}).Get(&next); err != nil {
return 0, err
}
t1 = next.Next
if t1.IsZero() {
return invalidDuration
return invalidDuration, nil
} else if t1.After(t2) {
return t1.Sub(t2)
return t1.Sub(t2), nil
}
if !manual && t2.Sub(t1) > catchupWindow {
s.logger.Warn("Schedule missed catchup window", "now", t2, "time", t1)
Expand Down Expand Up @@ -675,7 +690,7 @@ func (s *scheduler) addStart(nominalTime, actualTime time.Time, overlapPolicy en
}

// processBuffer should return true if there might be more work to do right now.
func (s *scheduler) processBuffer() bool {
func (s *scheduler) processBuffer() (bool, error) {
s.logger.Debug("processBuffer", "buffer", len(s.State.BufferedStarts), "running", len(s.Info.RunningWorkflows), "needRefresh", s.State.NeedRefresh)

// TODO: consider doing this always and removing needRefresh? we only end up here without
Expand All @@ -692,7 +707,7 @@ func (s *scheduler) processBuffer() bool {
req := s.Schedule.Action.GetStartWorkflow()
if req == nil || len(s.State.BufferedStarts) == 0 {
s.State.BufferedStarts = nil
return false
return false, nil
}

isRunning := len(s.Info.RunningWorkflows) > 0
Expand Down Expand Up @@ -729,11 +744,15 @@ func (s *scheduler) processBuffer() bool {
// Terminate or cancel if required (terminate overrides cancel if both are present)
if action.needTerminate {
for _, ex := range s.Info.RunningWorkflows {
s.terminateWorkflow(ex)
if err := s.terminateWorkflow(ex); err != nil {
return false, err
}
}
} else if action.needCancel {
for _, ex := range s.Info.RunningWorkflows {
s.cancelWorkflow(ex)
if err := s.cancelWorkflow(ex); err != nil {
return false, err
}
}
}

Expand All @@ -749,7 +768,7 @@ func (s *scheduler) processBuffer() bool {
}
}

return tryAgain
return tryAgain, nil
}

func (s *scheduler) recordAction(result *schedpb.ScheduleActionResult) {
Expand Down Expand Up @@ -788,6 +807,10 @@ func (s *scheduler) startWorkflow(
}
ctx := workflow.WithLocalActivityOptions(s.ctx, options)

requestID, err := s.newUUIDString()
if err != nil {
return nil, err
}
req := &schedspb.StartWorkflowRequest{
Request: &workflowservice.StartWorkflowExecutionRequest{
WorkflowId: workflowID,
Expand All @@ -798,7 +821,7 @@ func (s *scheduler) startWorkflow(
WorkflowRunTimeout: newWorkflow.WorkflowRunTimeout,
WorkflowTaskTimeout: newWorkflow.WorkflowTaskTimeout,
Identity: s.identity(),
RequestId: s.newUUIDString(),
RequestId: requestID,
WorkflowIdReusePolicy: enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
RetryPolicy: newWorkflow.RetryPolicy,
Memo: newWorkflow.Memo,
Expand All @@ -809,7 +832,7 @@ func (s *scheduler) startWorkflow(
ContinuedFailure: s.State.ContinuedFailure,
}
var res schedspb.StartWorkflowResponse
err := workflow.ExecuteLocalActivity(ctx, s.a.StartWorkflow, req).Get(s.ctx, &res)
err = workflow.ExecuteLocalActivity(ctx, s.a.StartWorkflow, req).Get(s.ctx, &res)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -885,47 +908,60 @@ func (s *scheduler) startLongPollWatcher(ex *commonpb.WorkflowExecution) {
s.watchingWorkflowId = ex.WorkflowId
}

func (s *scheduler) cancelWorkflow(ex *commonpb.WorkflowExecution) {
func (s *scheduler) cancelWorkflow(ex *commonpb.WorkflowExecution) error {
ctx := workflow.WithLocalActivityOptions(s.ctx, defaultLocalActivityOptions)
requestID, err := s.newUUIDString()
if err != nil {
return err
}
areq := &schedspb.CancelWorkflowRequest{
RequestId: s.newUUIDString(),
RequestId: requestID,
Identity: s.identity(),
Execution: ex,
Reason: "cancelled by schedule overlap policy",
}
err := workflow.ExecuteLocalActivity(ctx, s.a.CancelWorkflow, areq).Get(s.ctx, nil)
err = workflow.ExecuteLocalActivity(ctx, s.a.CancelWorkflow, areq).Get(s.ctx, nil)
if err != nil {
s.logger.Error("cancel workflow failed", "workflow", ex.WorkflowId, "error", err)
return err
}
// Note: the local activity has completed (or failed) here but the workflow might take time
// to close since a cancel is only a request.
return nil
}

func (s *scheduler) terminateWorkflow(ex *commonpb.WorkflowExecution) {
func (s *scheduler) terminateWorkflow(ex *commonpb.WorkflowExecution) error {
ctx := workflow.WithLocalActivityOptions(s.ctx, defaultLocalActivityOptions)
requestID, err := s.newUUIDString()
if err != nil {
return err
}
areq := &schedspb.TerminateWorkflowRequest{
RequestId: s.newUUIDString(),
RequestId: requestID,
Identity: s.identity(),
Execution: ex,
Reason: "terminated by schedule overlap policy",
}
err := workflow.ExecuteLocalActivity(ctx, s.a.TerminateWorkflow, areq).Get(s.ctx, nil)
err = workflow.ExecuteLocalActivity(ctx, s.a.TerminateWorkflow, areq).Get(s.ctx, nil)
if err != nil {
s.logger.Error("terminate workflow failed", "workflow", ex.WorkflowId, "error", err)
}
return err
}

func (s *scheduler) newUUIDString() string {
func (s *scheduler) newUUIDString() (string, error) {
if len(s.uuidBatch) == 0 {
workflow.SideEffect(s.ctx, func(ctx workflow.Context) interface{} {
if err := workflow.SideEffect(s.ctx, func(ctx workflow.Context) interface{} {
out := make([]string, 10)
for i := range out {
out[i] = uuid.NewString()
}
return out
}).Get(&s.uuidBatch)
}).Get(&s.uuidBatch); err != nil {
return "", err
}
}
next := s.uuidBatch[0]
s.uuidBatch = s.uuidBatch[1:]
return next
return next, nil
}
4 changes: 3 additions & 1 deletion service/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ func (wm *workerManager) Start() {
}

for _, w := range wm.workers {
w.Start()
if err := w.Start(); err != nil {
wm.logger.Fatal("Unable to start worker", tag.Error(err))
}
}

wm.logger.Info("", tag.ComponentWorkerManager, tag.LifeCycleStarted)
Expand Down