Skip to content

Commit

Permalink
Improve task latency calculation and retry behavior (#4408)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored May 30, 2023
1 parent 35708d9 commit 8aab8ab
Show file tree
Hide file tree
Showing 8 changed files with 141 additions and 105 deletions.
11 changes: 11 additions & 0 deletions common/backoff/retrypolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ const (
defaultFirstPhaseMaximumAttempts = 3
)

var (
// DisabledRetryPolicy is a retry policy that never retries
DisabledRetryPolicy RetryPolicy = &disabledRetryPolicyImpl{}
)

type (
// RetryPolicy is the API which needs to be implemented by various retry policy implementations
RetryPolicy interface {
Expand Down Expand Up @@ -80,6 +85,8 @@ type (
secondPolicy RetryPolicy
}

disabledRetryPolicyImpl struct{}

systemClock struct{}

retrierImpl struct {
Expand Down Expand Up @@ -204,6 +211,10 @@ func (tp *TwoPhaseRetryPolicy) ComputeNextDelay(elapsedTime time.Duration, numAt
return nextInterval
}

func (r *disabledRetryPolicyImpl) ComputeNextDelay(_ time.Duration, _ int) time.Duration {
return done
}

// Now returns the current time using the system clock
func (t systemClock) Now() time.Time {
return time.Now().UTC()
Expand Down
1 change: 0 additions & 1 deletion common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1334,7 +1334,6 @@ var (
TaskLoadLatency = NewTimerDef("task_latency_load") // latency from task generation to task loading (persistence scheduleToStart)
TaskScheduleLatency = NewTimerDef("task_latency_schedule") // latency from task submission to in-memory queue to processing (in-memory scheduleToStart)
TaskProcessingLatency = NewTimerDef("task_latency_processing") // latency for processing task one time
TaskProcessingUserLatency = NewTimerDef("task_latency_user") // latency for locking workflow execution
TaskLatency = NewTimerDef("task_latency") // task in-memory latency across multiple attempts
TaskQueueLatency = NewTimerDef("task_latency_queue") // task e2e latency
TaskAttempt = NewDimensionlessHistogramDef("task_attempt")
Expand Down
9 changes: 0 additions & 9 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,6 @@ const (
completeTaskRetryMaxInterval = 1 * time.Second
completeTaskRetryMaxAttempts = 10

taskProcessingRetryInitialInterval = 50 * time.Millisecond
taskProcessingRetryMaxAttempts = 1

taskRescheduleInitialInterval = 1 * time.Second
taskRescheduleBackoffCoefficient = 1.1
taskRescheduleMaxInterval = 3 * time.Minute
Expand Down Expand Up @@ -235,12 +232,6 @@ func CreateCompleteTaskRetryPolicy() backoff.RetryPolicy {
WithMaximumAttempts(completeTaskRetryMaxAttempts)
}

// CreateTaskProcessingRetryPolicy creates a retry policy for task processing
func CreateTaskProcessingRetryPolicy() backoff.RetryPolicy {
return backoff.NewExponentialRetryPolicy(taskProcessingRetryInitialInterval).
WithMaximumAttempts(taskProcessingRetryMaxAttempts)
}

// CreateTaskReschedulePolicy creates a retry policy for rescheduling task with errors not equal to ErrTaskRetry
func CreateTaskReschedulePolicy() backoff.RetryPolicy {
return backoff.NewExponentialRetryPolicy(taskRescheduleInitialInterval).
Expand Down
2 changes: 0 additions & 2 deletions service/history/consts/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,6 @@ var (
ErrUnknownCluster = serviceerror.NewInvalidArgument("unknown cluster")
// ErrBufferedQueryCleared is error indicating mutable state is cleared while buffered query is pending
ErrBufferedQueryCleared = serviceerror.NewUnavailable("buffered query cleared, please retry")
// ErrWorkflowBusy is error indicating workflow is currently busy and workflow context can't be locked within specified timeout
ErrWorkflowBusy = serviceerror.NewUnavailable("timeout locking workflow execution")
// ErrChildExecutionNotFound is error indicating pending child execution can't be found in workflow mutable state current branch
ErrChildExecutionNotFound = serviceerror.NewNotFound("Pending child execution not found.")
// ErrWorkflowNotReady is error indicating workflow mutable state is missing necessary information for handling the request
Expand Down
151 changes: 74 additions & 77 deletions service/history/queues/executable.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"sync"
"time"

"go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"

"go.temporal.io/server/common"
Expand Down Expand Up @@ -73,10 +74,6 @@ type (
)

var (
// schedulerRetryPolicy is the retry policy for retrying the executable
// in one submission to scheduler, the goroutine for processing this executable
// is held during the retry
schedulerRetryPolicy = common.CreateTaskProcessingRetryPolicy()
// reschedulePolicy is the policy for determine reschedule backoff duration
// across multiple submissions to scheduler
reschedulePolicy = common.CreateTaskReschedulePolicy()
Expand Down Expand Up @@ -116,16 +113,18 @@ type (
timeSource clock.TimeSource
namespaceRegistry namespace.Registry
clusterMetadata cluster.Metadata

readerID int64
loadTime time.Time
scheduledTime time.Time
userLatency time.Duration
lastActiveness bool
resourceExhaustedCount int
logger log.Logger
metricsHandler metrics.Handler
taggedMetricsHandler metrics.Handler
logger log.Logger
metricsHandler metrics.Handler

readerID int64
loadTime time.Time
scheduledTime time.Time
scheduleLatency time.Duration
attemptNoUserLatency time.Duration
inMemoryNoUserLatency time.Duration
lastActiveness bool
systemResourceExhaustedCount int
taggedMetricsHandler metrics.Handler
}
)

Expand Down Expand Up @@ -169,6 +168,10 @@ func NewExecutable(
}

func (e *executableImpl) Execute() (retErr error) {

startTime := e.timeSource.Now()
e.scheduleLatency = startTime.Sub(e.scheduledTime)

e.Lock()
if e.state != ctasks.TaskStatePending {
e.Unlock()
Expand Down Expand Up @@ -204,9 +207,21 @@ func (e *executableImpl) Execute() (retErr error) {
// is actually used which is upto the executor implementation
e.taggedMetricsHandler = e.metricsHandler.WithTags(e.estimateTaskMetricTag()...)
}
}()

startTime := e.timeSource.Now()
attemptUserLatency := time.Duration(0)
if duration, ok := metrics.ContextCounterGet(ctx, metrics.HistoryWorkflowExecutionCacheLatency.GetMetricName()); ok {
attemptUserLatency = time.Duration(duration)
}

attemptLatency := e.timeSource.Now().Sub(startTime)
e.attemptNoUserLatency = attemptLatency - attemptUserLatency
// emit total attempt latency so that we know how much time a task will occpy a worker goroutine
e.taggedMetricsHandler.Timer(metrics.TaskProcessingLatency.GetMetricName()).Record(attemptLatency)

priorityTaggedProvider := e.taggedMetricsHandler.WithTags(metrics.TaskPriorityTag(e.priority.String()))
priorityTaggedProvider.Counter(metrics.TaskRequests.GetMetricName()).Record(1)
priorityTaggedProvider.Timer(metrics.TaskScheduleLatency.GetMetricName()).Record(e.scheduleLatency)
}()

metricsTags, isActive, err := e.executor.Execute(ctx, e)
e.taggedMetricsHandler = e.metricsHandler.WithTags(metricsTags...)
Expand All @@ -218,23 +233,16 @@ func (e *executableImpl) Execute() (retErr error) {
}
e.lastActiveness = isActive

e.userLatency = 0
if duration, ok := metrics.ContextCounterGet(ctx, metrics.HistoryWorkflowExecutionCacheLatency.GetMetricName()); ok {
e.userLatency = time.Duration(duration)
}

e.taggedMetricsHandler.Timer(metrics.TaskProcessingLatency.GetMetricName()).Record(time.Since(startTime))
e.taggedMetricsHandler.Timer(metrics.TaskProcessingUserLatency.GetMetricName()).Record(e.userLatency)

priorityTaggedProvider := e.taggedMetricsHandler.WithTags(metrics.TaskPriorityTag(e.priority.String()))
priorityTaggedProvider.Counter(metrics.TaskRequests.GetMetricName()).Record(1)
priorityTaggedProvider.Timer(metrics.TaskScheduleLatency.GetMetricName()).Record(startTime.Sub(e.scheduledTime))

return err
}

func (e *executableImpl) HandleErr(err error) (retErr error) {
defer func() {
if errors.Is(retErr, consts.ErrResourceExhaustedBusyWorkflow) {
// if err is due to workflow busy, do not take any latency related to this attempt into account
e.inMemoryNoUserLatency += e.scheduleLatency + e.attemptNoUserLatency
}

if retErr != nil {
e.Lock()
defer e.Unlock()
Expand All @@ -251,12 +259,17 @@ func (e *executableImpl) HandleErr(err error) (retErr error) {
return nil
}

if common.IsResourceExhausted(err) {
e.resourceExhaustedCount++
e.taggedMetricsHandler.Counter(metrics.TaskThrottledCounter.GetMetricName()).Record(1)
return err
var resourceExhaustedErr *serviceerror.ResourceExhausted
if errors.As(err, &resourceExhaustedErr) {
if resourceExhaustedErr.Cause != enums.RESOURCE_EXHAUSTED_CAUSE_BUSY_WORKFLOW {
e.systemResourceExhaustedCount++
e.taggedMetricsHandler.Counter(metrics.TaskThrottledCounter.GetMetricName()).Record(1)
return err
}

err = consts.ErrResourceExhaustedBusyWorkflow
}
e.resourceExhaustedCount = 0
e.systemResourceExhaustedCount = 0

if _, isNotFound := err.(*serviceerror.NotFound); isNotFound {
return nil
Expand All @@ -277,7 +290,7 @@ func (e *executableImpl) HandleErr(err error) (retErr error) {
return err
}

if err == consts.ErrWorkflowBusy {
if errors.Is(err, consts.ErrResourceExhaustedBusyWorkflow) {
e.taggedMetricsHandler.Counter(metrics.TaskWorkflowBusyCounter.GetMetricName()).Record(1)
return err
}
Expand Down Expand Up @@ -323,40 +336,17 @@ func (e *executableImpl) HandleErr(err error) (retErr error) {

func (e *executableImpl) IsRetryableError(err error) bool {
// this determines if the executable should be retried when hold the worker goroutine

if e.State() != ctasks.TaskStatePending {
return false
}

if shard.IsShardOwnershipLostError(err) {
return false
}

// don't retry immediately for resource exhausted which may incur more load
// context deadline exceed may also suggested downstream is overloaded, so don't retry immediately
if common.IsResourceExhausted(err) || common.IsContextDeadlineExceededErr(err) {
return false
}

// Internal error is non-retryable and usually means unexpected error has happened,
// e.g. unknown task, corrupted state, panic etc.
if common.IsInternalError(err) {
return false
}

// ErrTaskRetry means mutable state is not ready for standby task processing
// there's no point for retrying the task immediately which will hold the worker corouinte
// TODO: change ErrTaskRetry to a better name
return err != consts.ErrTaskRetry &&
err != consts.ErrWorkflowBusy &&
err != consts.ErrDependencyTaskNotCompleted &&
err != consts.ErrNamespaceHandover
//
// never retry task while holding the goroutine, and rely on shouldResubmitOnNack
return false
}

func (e *executableImpl) RetryPolicy() backoff.RetryPolicy {
// this is the retry policy for one submission
// not for calculating the backoff after the task is nacked
return schedulerRetryPolicy
//
// never retry task while holding the goroutine, and rely on shouldResubmitOnNack
return backoff.DisabledRetryPolicy
}

func (e *executableImpl) Abort() {
Expand Down Expand Up @@ -394,7 +384,7 @@ func (e *executableImpl) Ack() {
e.taggedMetricsHandler.Histogram(metrics.TaskAttempt.GetMetricName(), metrics.TaskAttempt.GetMetricUnit()).Record(int64(e.attempt))

priorityTaggedProvider := e.taggedMetricsHandler.WithTags(metrics.TaskPriorityTag(e.lowestPriority.String()))
priorityTaggedProvider.Timer(metrics.TaskLatency.GetMetricName()).Record(time.Since(e.loadTime))
priorityTaggedProvider.Timer(metrics.TaskLatency.GetMetricName()).Record(e.inMemoryNoUserLatency)

readerIDTaggedProvider := priorityTaggedProvider.WithTags(metrics.QueueReaderIDTag(e.readerID))
readerIDTaggedProvider.Timer(metrics.TaskQueueLatency.GetMetricName()).Record(time.Since(e.GetVisibilityTime()))
Expand All @@ -418,7 +408,11 @@ func (e *executableImpl) Nack(err error) {
}

if !submitted {
e.rescheduler.Add(e, e.rescheduleTime(err, e.Attempt()))
backoffDuration := e.backoffDuration(err, e.Attempt())
e.rescheduler.Add(e, e.timeSource.Now().Add(backoffDuration))
if !errors.Is(err, consts.ErrResourceExhaustedBusyWorkflow) {
e.inMemoryNoUserLatency += backoffDuration
}
}
}

Expand All @@ -430,7 +424,7 @@ func (e *executableImpl) Reschedule() {

e.updatePriority()

e.rescheduler.Add(e, e.rescheduleTime(nil, e.Attempt()))
e.rescheduler.Add(e, e.timeSource.Now().Add(e.backoffDuration(nil, e.Attempt())))
}

func (e *executableImpl) State() ctasks.State {
Expand Down Expand Up @@ -474,8 +468,9 @@ func (e *executableImpl) shouldResubmitOnNack(attempt int, err error) bool {
return false
}

if common.IsResourceExhausted(err) &&
e.resourceExhaustedCount > resourceExhaustedResubmitMaxAttempts {
if !errors.Is(err, consts.ErrResourceExhaustedBusyWorkflow) &&
common.IsResourceExhausted(err) &&
e.systemResourceExhaustedCount > resourceExhaustedResubmitMaxAttempts {
return false
}

Expand All @@ -492,10 +487,10 @@ func (e *executableImpl) shouldResubmitOnNack(attempt int, err error) bool {
err != consts.ErrNamespaceHandover
}

func (e *executableImpl) rescheduleTime(
func (e *executableImpl) backoffDuration(
err error,
attempt int,
) time.Time {
) time.Duration {
// elapsedTime, the first parameter in ComputeNextDelay is not relevant here
// since reschedule policy has no expiration interval.

Expand All @@ -504,22 +499,24 @@ func (e *executableImpl) rescheduleTime(
common.IsInternalError(err) {
// using a different reschedule policy to slow down retry
// as immediate retry typically won't resolve the issue.
return e.timeSource.Now().Add(taskNotReadyReschedulePolicy.ComputeNextDelay(0, attempt))
return taskNotReadyReschedulePolicy.ComputeNextDelay(0, attempt)
}

if err == consts.ErrDependencyTaskNotCompleted {
return e.timeSource.Now().Add(dependencyTaskNotCompletedReschedulePolicy.ComputeNextDelay(0, attempt))
return dependencyTaskNotCompletedReschedulePolicy.ComputeNextDelay(0, attempt)
}

backoffDuration := reschedulePolicy.ComputeNextDelay(0, attempt)
if common.IsResourceExhausted(err) {
if !errors.Is(err, consts.ErrResourceExhaustedBusyWorkflow) && common.IsResourceExhausted(err) {
// try a different reschedule policy to slow down retry
// upon resource exhausted error and pick the longer backoff
// duration
backoffDuration = util.Max(backoffDuration, taskResourceExhuastedReschedulePolicy.ComputeNextDelay(0, e.resourceExhaustedCount))
// upon system resource exhausted error and pick the longer backoff duration
backoffDuration = util.Max(
backoffDuration,
taskResourceExhuastedReschedulePolicy.ComputeNextDelay(0, e.systemResourceExhaustedCount),
)
}

return e.timeSource.Now().Add(backoffDuration)
return backoffDuration
}

func (e *executableImpl) updatePriority() {
Expand Down
Loading

0 comments on commit 8aab8ab

Please sign in to comment.