From 8aab8ab9c1a88b8793b16bdbea24928e18cf1eb1 Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Tue, 30 May 2023 15:36:15 -0700 Subject: [PATCH] Improve task latency calculation and retry behavior (#4408) --- common/backoff/retrypolicy.go | 11 ++ common/metrics/metric_defs.go | 1 - common/util.go | 9 -- service/history/consts/const.go | 2 - service/history/queues/executable.go | 151 +++++++++--------- service/history/queues/executable_test.go | 60 +++++-- service/history/timerQueueTaskExecutorBase.go | 7 +- .../history/transferQueueTaskExecutorBase.go | 5 +- 8 files changed, 141 insertions(+), 105 deletions(-) diff --git a/common/backoff/retrypolicy.go b/common/backoff/retrypolicy.go index 1a9f2479079..65efffaaaa5 100644 --- a/common/backoff/retrypolicy.go +++ b/common/backoff/retrypolicy.go @@ -44,6 +44,11 @@ const ( defaultFirstPhaseMaximumAttempts = 3 ) +var ( + // DisabledRetryPolicy is a retry policy that never retries + DisabledRetryPolicy RetryPolicy = &disabledRetryPolicyImpl{} +) + type ( // RetryPolicy is the API which needs to be implemented by various retry policy implementations RetryPolicy interface { @@ -80,6 +85,8 @@ type ( secondPolicy RetryPolicy } + disabledRetryPolicyImpl struct{} + systemClock struct{} retrierImpl struct { @@ -204,6 +211,10 @@ func (tp *TwoPhaseRetryPolicy) ComputeNextDelay(elapsedTime time.Duration, numAt return nextInterval } +func (r *disabledRetryPolicyImpl) ComputeNextDelay(_ time.Duration, _ int) time.Duration { + return done +} + // Now returns the current time using the system clock func (t systemClock) Now() time.Time { return time.Now().UTC() diff --git a/common/metrics/metric_defs.go b/common/metrics/metric_defs.go index 057220b341b..f8332546847 100644 --- a/common/metrics/metric_defs.go +++ b/common/metrics/metric_defs.go @@ -1334,7 +1334,6 @@ var ( TaskLoadLatency = NewTimerDef("task_latency_load") // latency from task generation to task loading (persistence scheduleToStart) TaskScheduleLatency = NewTimerDef("task_latency_schedule") // latency from task submission to in-memory queue to processing (in-memory scheduleToStart) TaskProcessingLatency = NewTimerDef("task_latency_processing") // latency for processing task one time - TaskProcessingUserLatency = NewTimerDef("task_latency_user") // latency for locking workflow execution TaskLatency = NewTimerDef("task_latency") // task in-memory latency across multiple attempts TaskQueueLatency = NewTimerDef("task_latency_queue") // task e2e latency TaskAttempt = NewDimensionlessHistogramDef("task_attempt") diff --git a/common/util.go b/common/util.go index bdb89c76074..3afc5ea2e1e 100644 --- a/common/util.go +++ b/common/util.go @@ -87,9 +87,6 @@ const ( completeTaskRetryMaxInterval = 1 * time.Second completeTaskRetryMaxAttempts = 10 - taskProcessingRetryInitialInterval = 50 * time.Millisecond - taskProcessingRetryMaxAttempts = 1 - taskRescheduleInitialInterval = 1 * time.Second taskRescheduleBackoffCoefficient = 1.1 taskRescheduleMaxInterval = 3 * time.Minute @@ -235,12 +232,6 @@ func CreateCompleteTaskRetryPolicy() backoff.RetryPolicy { WithMaximumAttempts(completeTaskRetryMaxAttempts) } -// CreateTaskProcessingRetryPolicy creates a retry policy for task processing -func CreateTaskProcessingRetryPolicy() backoff.RetryPolicy { - return backoff.NewExponentialRetryPolicy(taskProcessingRetryInitialInterval). - WithMaximumAttempts(taskProcessingRetryMaxAttempts) -} - // CreateTaskReschedulePolicy creates a retry policy for rescheduling task with errors not equal to ErrTaskRetry func CreateTaskReschedulePolicy() backoff.RetryPolicy { return backoff.NewExponentialRetryPolicy(taskRescheduleInitialInterval). diff --git a/service/history/consts/const.go b/service/history/consts/const.go index db2cf88d9c3..0d1b23d364a 100644 --- a/service/history/consts/const.go +++ b/service/history/consts/const.go @@ -87,8 +87,6 @@ var ( ErrUnknownCluster = serviceerror.NewInvalidArgument("unknown cluster") // ErrBufferedQueryCleared is error indicating mutable state is cleared while buffered query is pending ErrBufferedQueryCleared = serviceerror.NewUnavailable("buffered query cleared, please retry") - // ErrWorkflowBusy is error indicating workflow is currently busy and workflow context can't be locked within specified timeout - ErrWorkflowBusy = serviceerror.NewUnavailable("timeout locking workflow execution") // ErrChildExecutionNotFound is error indicating pending child execution can't be found in workflow mutable state current branch ErrChildExecutionNotFound = serviceerror.NewNotFound("Pending child execution not found.") // ErrWorkflowNotReady is error indicating workflow mutable state is missing necessary information for handling the request diff --git a/service/history/queues/executable.go b/service/history/queues/executable.go index 2d427c90641..b58042d59c9 100644 --- a/service/history/queues/executable.go +++ b/service/history/queues/executable.go @@ -34,6 +34,7 @@ import ( "sync" "time" + "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" "go.temporal.io/server/common" @@ -73,10 +74,6 @@ type ( ) var ( - // schedulerRetryPolicy is the retry policy for retrying the executable - // in one submission to scheduler, the goroutine for processing this executable - // is held during the retry - schedulerRetryPolicy = common.CreateTaskProcessingRetryPolicy() // reschedulePolicy is the policy for determine reschedule backoff duration // across multiple submissions to scheduler reschedulePolicy = common.CreateTaskReschedulePolicy() @@ -116,16 +113,18 @@ type ( timeSource clock.TimeSource namespaceRegistry namespace.Registry clusterMetadata cluster.Metadata - - readerID int64 - loadTime time.Time - scheduledTime time.Time - userLatency time.Duration - lastActiveness bool - resourceExhaustedCount int - logger log.Logger - metricsHandler metrics.Handler - taggedMetricsHandler metrics.Handler + logger log.Logger + metricsHandler metrics.Handler + + readerID int64 + loadTime time.Time + scheduledTime time.Time + scheduleLatency time.Duration + attemptNoUserLatency time.Duration + inMemoryNoUserLatency time.Duration + lastActiveness bool + systemResourceExhaustedCount int + taggedMetricsHandler metrics.Handler } ) @@ -169,6 +168,10 @@ func NewExecutable( } func (e *executableImpl) Execute() (retErr error) { + + startTime := e.timeSource.Now() + e.scheduleLatency = startTime.Sub(e.scheduledTime) + e.Lock() if e.state != ctasks.TaskStatePending { e.Unlock() @@ -204,9 +207,21 @@ func (e *executableImpl) Execute() (retErr error) { // is actually used which is upto the executor implementation e.taggedMetricsHandler = e.metricsHandler.WithTags(e.estimateTaskMetricTag()...) } - }() - startTime := e.timeSource.Now() + attemptUserLatency := time.Duration(0) + if duration, ok := metrics.ContextCounterGet(ctx, metrics.HistoryWorkflowExecutionCacheLatency.GetMetricName()); ok { + attemptUserLatency = time.Duration(duration) + } + + attemptLatency := e.timeSource.Now().Sub(startTime) + e.attemptNoUserLatency = attemptLatency - attemptUserLatency + // emit total attempt latency so that we know how much time a task will occpy a worker goroutine + e.taggedMetricsHandler.Timer(metrics.TaskProcessingLatency.GetMetricName()).Record(attemptLatency) + + priorityTaggedProvider := e.taggedMetricsHandler.WithTags(metrics.TaskPriorityTag(e.priority.String())) + priorityTaggedProvider.Counter(metrics.TaskRequests.GetMetricName()).Record(1) + priorityTaggedProvider.Timer(metrics.TaskScheduleLatency.GetMetricName()).Record(e.scheduleLatency) + }() metricsTags, isActive, err := e.executor.Execute(ctx, e) e.taggedMetricsHandler = e.metricsHandler.WithTags(metricsTags...) @@ -218,23 +233,16 @@ func (e *executableImpl) Execute() (retErr error) { } e.lastActiveness = isActive - e.userLatency = 0 - if duration, ok := metrics.ContextCounterGet(ctx, metrics.HistoryWorkflowExecutionCacheLatency.GetMetricName()); ok { - e.userLatency = time.Duration(duration) - } - - e.taggedMetricsHandler.Timer(metrics.TaskProcessingLatency.GetMetricName()).Record(time.Since(startTime)) - e.taggedMetricsHandler.Timer(metrics.TaskProcessingUserLatency.GetMetricName()).Record(e.userLatency) - - priorityTaggedProvider := e.taggedMetricsHandler.WithTags(metrics.TaskPriorityTag(e.priority.String())) - priorityTaggedProvider.Counter(metrics.TaskRequests.GetMetricName()).Record(1) - priorityTaggedProvider.Timer(metrics.TaskScheduleLatency.GetMetricName()).Record(startTime.Sub(e.scheduledTime)) - return err } func (e *executableImpl) HandleErr(err error) (retErr error) { defer func() { + if errors.Is(retErr, consts.ErrResourceExhaustedBusyWorkflow) { + // if err is due to workflow busy, do not take any latency related to this attempt into account + e.inMemoryNoUserLatency += e.scheduleLatency + e.attemptNoUserLatency + } + if retErr != nil { e.Lock() defer e.Unlock() @@ -251,12 +259,17 @@ func (e *executableImpl) HandleErr(err error) (retErr error) { return nil } - if common.IsResourceExhausted(err) { - e.resourceExhaustedCount++ - e.taggedMetricsHandler.Counter(metrics.TaskThrottledCounter.GetMetricName()).Record(1) - return err + var resourceExhaustedErr *serviceerror.ResourceExhausted + if errors.As(err, &resourceExhaustedErr) { + if resourceExhaustedErr.Cause != enums.RESOURCE_EXHAUSTED_CAUSE_BUSY_WORKFLOW { + e.systemResourceExhaustedCount++ + e.taggedMetricsHandler.Counter(metrics.TaskThrottledCounter.GetMetricName()).Record(1) + return err + } + + err = consts.ErrResourceExhaustedBusyWorkflow } - e.resourceExhaustedCount = 0 + e.systemResourceExhaustedCount = 0 if _, isNotFound := err.(*serviceerror.NotFound); isNotFound { return nil @@ -277,7 +290,7 @@ func (e *executableImpl) HandleErr(err error) (retErr error) { return err } - if err == consts.ErrWorkflowBusy { + if errors.Is(err, consts.ErrResourceExhaustedBusyWorkflow) { e.taggedMetricsHandler.Counter(metrics.TaskWorkflowBusyCounter.GetMetricName()).Record(1) return err } @@ -323,40 +336,17 @@ func (e *executableImpl) HandleErr(err error) (retErr error) { func (e *executableImpl) IsRetryableError(err error) bool { // this determines if the executable should be retried when hold the worker goroutine - - if e.State() != ctasks.TaskStatePending { - return false - } - - if shard.IsShardOwnershipLostError(err) { - return false - } - - // don't retry immediately for resource exhausted which may incur more load - // context deadline exceed may also suggested downstream is overloaded, so don't retry immediately - if common.IsResourceExhausted(err) || common.IsContextDeadlineExceededErr(err) { - return false - } - - // Internal error is non-retryable and usually means unexpected error has happened, - // e.g. unknown task, corrupted state, panic etc. - if common.IsInternalError(err) { - return false - } - - // ErrTaskRetry means mutable state is not ready for standby task processing - // there's no point for retrying the task immediately which will hold the worker corouinte - // TODO: change ErrTaskRetry to a better name - return err != consts.ErrTaskRetry && - err != consts.ErrWorkflowBusy && - err != consts.ErrDependencyTaskNotCompleted && - err != consts.ErrNamespaceHandover + // + // never retry task while holding the goroutine, and rely on shouldResubmitOnNack + return false } func (e *executableImpl) RetryPolicy() backoff.RetryPolicy { // this is the retry policy for one submission // not for calculating the backoff after the task is nacked - return schedulerRetryPolicy + // + // never retry task while holding the goroutine, and rely on shouldResubmitOnNack + return backoff.DisabledRetryPolicy } func (e *executableImpl) Abort() { @@ -394,7 +384,7 @@ func (e *executableImpl) Ack() { e.taggedMetricsHandler.Histogram(metrics.TaskAttempt.GetMetricName(), metrics.TaskAttempt.GetMetricUnit()).Record(int64(e.attempt)) priorityTaggedProvider := e.taggedMetricsHandler.WithTags(metrics.TaskPriorityTag(e.lowestPriority.String())) - priorityTaggedProvider.Timer(metrics.TaskLatency.GetMetricName()).Record(time.Since(e.loadTime)) + priorityTaggedProvider.Timer(metrics.TaskLatency.GetMetricName()).Record(e.inMemoryNoUserLatency) readerIDTaggedProvider := priorityTaggedProvider.WithTags(metrics.QueueReaderIDTag(e.readerID)) readerIDTaggedProvider.Timer(metrics.TaskQueueLatency.GetMetricName()).Record(time.Since(e.GetVisibilityTime())) @@ -418,7 +408,11 @@ func (e *executableImpl) Nack(err error) { } if !submitted { - e.rescheduler.Add(e, e.rescheduleTime(err, e.Attempt())) + backoffDuration := e.backoffDuration(err, e.Attempt()) + e.rescheduler.Add(e, e.timeSource.Now().Add(backoffDuration)) + if !errors.Is(err, consts.ErrResourceExhaustedBusyWorkflow) { + e.inMemoryNoUserLatency += backoffDuration + } } } @@ -430,7 +424,7 @@ func (e *executableImpl) Reschedule() { e.updatePriority() - e.rescheduler.Add(e, e.rescheduleTime(nil, e.Attempt())) + e.rescheduler.Add(e, e.timeSource.Now().Add(e.backoffDuration(nil, e.Attempt()))) } func (e *executableImpl) State() ctasks.State { @@ -474,8 +468,9 @@ func (e *executableImpl) shouldResubmitOnNack(attempt int, err error) bool { return false } - if common.IsResourceExhausted(err) && - e.resourceExhaustedCount > resourceExhaustedResubmitMaxAttempts { + if !errors.Is(err, consts.ErrResourceExhaustedBusyWorkflow) && + common.IsResourceExhausted(err) && + e.systemResourceExhaustedCount > resourceExhaustedResubmitMaxAttempts { return false } @@ -492,10 +487,10 @@ func (e *executableImpl) shouldResubmitOnNack(attempt int, err error) bool { err != consts.ErrNamespaceHandover } -func (e *executableImpl) rescheduleTime( +func (e *executableImpl) backoffDuration( err error, attempt int, -) time.Time { +) time.Duration { // elapsedTime, the first parameter in ComputeNextDelay is not relevant here // since reschedule policy has no expiration interval. @@ -504,22 +499,24 @@ func (e *executableImpl) rescheduleTime( common.IsInternalError(err) { // using a different reschedule policy to slow down retry // as immediate retry typically won't resolve the issue. - return e.timeSource.Now().Add(taskNotReadyReschedulePolicy.ComputeNextDelay(0, attempt)) + return taskNotReadyReschedulePolicy.ComputeNextDelay(0, attempt) } if err == consts.ErrDependencyTaskNotCompleted { - return e.timeSource.Now().Add(dependencyTaskNotCompletedReschedulePolicy.ComputeNextDelay(0, attempt)) + return dependencyTaskNotCompletedReschedulePolicy.ComputeNextDelay(0, attempt) } backoffDuration := reschedulePolicy.ComputeNextDelay(0, attempt) - if common.IsResourceExhausted(err) { + if !errors.Is(err, consts.ErrResourceExhaustedBusyWorkflow) && common.IsResourceExhausted(err) { // try a different reschedule policy to slow down retry - // upon resource exhausted error and pick the longer backoff - // duration - backoffDuration = util.Max(backoffDuration, taskResourceExhuastedReschedulePolicy.ComputeNextDelay(0, e.resourceExhaustedCount)) + // upon system resource exhausted error and pick the longer backoff duration + backoffDuration = util.Max( + backoffDuration, + taskResourceExhuastedReschedulePolicy.ComputeNextDelay(0, e.systemResourceExhaustedCount), + ) } - return e.timeSource.Now().Add(backoffDuration) + return backoffDuration } func (e *executableImpl) updatePriority() { diff --git a/service/history/queues/executable_test.go b/service/history/queues/executable_test.go index df0f18b4f68..20f1ee8522e 100644 --- a/service/history/queues/executable_test.go +++ b/service/history/queues/executable_test.go @@ -103,17 +103,59 @@ func (s *executableSuite) TestExecute_TaskExecuted() { s.NoError(executable.Execute()) } -func (s *executableSuite) TestExecute_UserLatency() { +func (s *executableSuite) TestExecute_InMemoryNoUserLatency() { executable := s.newTestExecutable() - expectedUserLatency := int64(133) - updateContext := func(ctx context.Context, taskInfo interface{}) { - metrics.ContextCounterAdd(ctx, metrics.HistoryWorkflowExecutionCacheLatency.GetMetricName(), expectedUserLatency) - } - - s.mockExecutor.EXPECT().Execute(gomock.Any(), executable).Do(updateContext).Return(nil, true, nil) - s.NoError(executable.Execute()) - s.Equal(time.Duration(expectedUserLatency), executable.(*executableImpl).userLatency) + scheduleLatency := 100 * time.Millisecond + userLatency := 500 * time.Millisecond + attemptLatency := time.Second + + now := time.Now() + s.timeSource.Update(now) + executable.SetScheduledTime(now) + + now = now.Add(scheduleLatency) + s.timeSource.Update(now) + + s.mockExecutor.EXPECT().Execute(gomock.Any(), executable).Do(func(ctx context.Context, taskInfo interface{}) { + metrics.ContextCounterAdd( + ctx, + metrics.HistoryWorkflowExecutionCacheLatency.GetMetricName(), + int64(userLatency), + ) + + now = now.Add(attemptLatency) + s.timeSource.Update(now) + }).Return(nil, true, consts.ErrResourceExhaustedBusyWorkflow) + err := executable.HandleErr(executable.Execute()) + s.Equal(consts.ErrResourceExhaustedBusyWorkflow, err) + + s.mockScheduler.EXPECT().TrySubmit(executable).Return(false) + s.mockRescheduler.EXPECT().Add(executable, gomock.Any()) + executable.Nack(err) + + // backoff duration + now = now.Add(time.Second) + s.timeSource.Update(now) + + executable.SetScheduledTime(now) + now = now.Add(scheduleLatency) + s.timeSource.Update(now) + + s.mockExecutor.EXPECT().Execute(gomock.Any(), executable).Do(func(ctx context.Context, taskInfo interface{}) { + metrics.ContextCounterAdd( + ctx, + metrics.HistoryWorkflowExecutionCacheLatency.GetMetricName(), + int64(userLatency), + ) + + now = now.Add(attemptLatency) + s.timeSource.Update(now) + }).Return(nil, true, nil) + err = executable.HandleErr(executable.Execute()) + s.NoError(err) + + s.Equal(scheduleLatency+attemptLatency-userLatency, executable.(*executableImpl).inMemoryNoUserLatency) } func (s *executableSuite) TestExecute_CapturePanic() { diff --git a/service/history/timerQueueTaskExecutorBase.go b/service/history/timerQueueTaskExecutorBase.go index d1e2c95e4f9..05ee66981d8 100644 --- a/service/history/timerQueueTaskExecutorBase.go +++ b/service/history/timerQueueTaskExecutorBase.go @@ -170,9 +170,8 @@ func getWorkflowExecutionContext( namespaceID namespace.ID, execution commonpb.WorkflowExecution, ) (workflow.Context, wcache.ReleaseCacheFunc, error) { - ctx, cancel := context.WithTimeout(ctx, taskGetExecutionTimeout) - defer cancel() - + // workflowCache will automatically use short context timeout when + // locking workflow for all background calls, we don't need a separate context here weContext, release, err := workflowCache.GetOrCreateWorkflowExecution( ctx, namespaceID, @@ -180,7 +179,7 @@ func getWorkflowExecutionContext( workflow.LockPriorityLow, ) if common.IsContextDeadlineExceededErr(err) { - err = consts.ErrWorkflowBusy + err = consts.ErrResourceExhaustedBusyWorkflow } return weContext, release, err } diff --git a/service/history/transferQueueTaskExecutorBase.go b/service/history/transferQueueTaskExecutorBase.go index 2e7e9891f92..c2b2cc2d176 100644 --- a/service/history/transferQueueTaskExecutorBase.go +++ b/service/history/transferQueueTaskExecutorBase.go @@ -58,9 +58,8 @@ import ( ) const ( - taskTimeout = time.Second * 3 * debug.TimeoutMultiplier - taskGetExecutionTimeout = time.Second * debug.TimeoutMultiplier - taskHistoryOpTimeout = 20 * time.Second + taskTimeout = time.Second * 3 * debug.TimeoutMultiplier + taskHistoryOpTimeout = 20 * time.Second ) var errUnknownTransferTask = serviceerror.NewInternal("Unknown transfer task")