Skip to content

Commit

Permalink
Fix task inMemoryNoUserLatency 2.0 (#4431)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Jun 2, 2023
1 parent b34d21a commit ce47340
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 47 deletions.
14 changes: 10 additions & 4 deletions service/history/queues/executable.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,12 @@ func (e *executableImpl) Execute() (retErr error) {
priorityTaggedProvider := e.taggedMetricsHandler.WithTags(metrics.TaskPriorityTag(e.priority.String()))
priorityTaggedProvider.Counter(metrics.TaskRequests.GetMetricName()).Record(1)
priorityTaggedProvider.Timer(metrics.TaskScheduleLatency.GetMetricName()).Record(e.scheduleLatency)

if retErr == nil {
e.inMemoryNoUserLatency += e.scheduleLatency + e.attemptNoUserLatency
}
// if retErr is not nil, HandleErr will take care of the inMemoryNoUserLatency calculation
// Not doing it here as for certain errors latency for the attempt should not be counted
}()

metricsTags, isActive, err := e.executor.Execute(ctx, e)
Expand All @@ -237,6 +243,10 @@ func (e *executableImpl) Execute() (retErr error) {
}

func (e *executableImpl) HandleErr(err error) (retErr error) {
if err == nil {
return nil
}

defer func() {
if !errors.Is(retErr, consts.ErrResourceExhaustedBusyWorkflow) {
// if err is due to workflow busy, do not take any latency related to this attempt into account
Expand All @@ -255,10 +265,6 @@ func (e *executableImpl) HandleErr(err error) (retErr error) {
}
}()

if err == nil {
return nil
}

var resourceExhaustedErr *serviceerror.ResourceExhausted
if errors.As(err, &resourceExhaustedErr) {
if resourceExhaustedErr.Cause != enums.RESOURCE_EXHAUSTED_CAUSE_BUSY_WORKFLOW {
Expand Down
160 changes: 117 additions & 43 deletions service/history/queues/executable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,64 +103,138 @@ func (s *executableSuite) TestExecute_TaskExecuted() {
s.NoError(executable.Execute())
}

func (s *executableSuite) TestExecute_InMemoryNoUserLatency() {
executable := s.newTestExecutable()

func (s *executableSuite) TestExecute_InMemoryNoUserLatency_SingleAttempt() {
scheduleLatency := 100 * time.Millisecond
userLatency := 500 * time.Millisecond
attemptLatency := time.Second
attemptNoUserLatency := scheduleLatency + attemptLatency - userLatency

testCases := []struct {
taskErr error
expectError bool
expectedAttemptNoUserLatency time.Duration
expectBackoff bool
}{
{
taskErr: nil,
expectError: false,
expectedAttemptNoUserLatency: attemptNoUserLatency,
expectBackoff: false,
},
{
taskErr: serviceerror.NewUnavailable("some random error"),
expectError: true,
expectedAttemptNoUserLatency: attemptNoUserLatency,
expectBackoff: true,
},
{
taskErr: serviceerror.NewNotFound("not found error"),
expectError: false,
expectedAttemptNoUserLatency: attemptNoUserLatency,
expectBackoff: false,
},
{
taskErr: consts.ErrResourceExhaustedBusyWorkflow,
expectError: true,
expectedAttemptNoUserLatency: 0,
expectBackoff: false,
},
}

now := time.Now()
s.timeSource.Update(now)
executable.SetScheduledTime(now)

now = now.Add(scheduleLatency)
s.timeSource.Update(now)
for _, tc := range testCases {
executable := s.newTestExecutable()

s.mockExecutor.EXPECT().Execute(gomock.Any(), executable).Do(func(ctx context.Context, taskInfo interface{}) {
metrics.ContextCounterAdd(
ctx,
metrics.HistoryWorkflowExecutionCacheLatency.GetMetricName(),
int64(userLatency),
)
now := time.Now()
s.timeSource.Update(now)
executable.SetScheduledTime(now)

now = now.Add(attemptLatency)
now = now.Add(scheduleLatency)
s.timeSource.Update(now)
}).Return(nil, true, consts.ErrResourceExhaustedBusyWorkflow)
err := executable.HandleErr(executable.Execute())
s.Equal(consts.ErrResourceExhaustedBusyWorkflow, err)

s.mockScheduler.EXPECT().TrySubmit(executable).Return(false)
s.mockRescheduler.EXPECT().Add(executable, gomock.Any())
executable.Nack(err)
s.mockExecutor.EXPECT().Execute(gomock.Any(), executable).Do(func(ctx context.Context, taskInfo interface{}) {
metrics.ContextCounterAdd(
ctx,
metrics.HistoryWorkflowExecutionCacheLatency.GetMetricName(),
int64(userLatency),
)

now = now.Add(attemptLatency)
s.timeSource.Update(now)
}).Return(nil, true, tc.taskErr)

err := executable.Execute()
if err != nil {
err = executable.HandleErr(err)
}

if tc.expectError {
s.Error(err)
s.mockScheduler.EXPECT().TrySubmit(executable).Return(false)
s.mockRescheduler.EXPECT().Add(executable, gomock.Any())
executable.Nack(err)
} else {
s.NoError(err)
}

actualAttemptNoUserLatency := executable.(*executableImpl).inMemoryNoUserLatency
if tc.expectBackoff {
// the backoff duration is random, so we can't compare the exact value
s.Less(tc.expectedAttemptNoUserLatency, actualAttemptNoUserLatency)
} else {
s.Equal(tc.expectedAttemptNoUserLatency, actualAttemptNoUserLatency)
}
}
}

// backoff duration
now = now.Add(time.Second)
s.timeSource.Update(now)
func (s *executableSuite) TestExecute_InMemoryNoUserLatency_MultipleAttempts() {
numAttempts := 3
scheduleLatencies := []time.Duration{100 * time.Millisecond, 150 * time.Millisecond, 200 * time.Millisecond}
userLatencies := []time.Duration{10 * time.Millisecond, 20 * time.Millisecond, 30 * time.Millisecond}
attemptLatencies := []time.Duration{time.Second, 2 * time.Second, 3 * time.Second}
taskErrors := []error{
serviceerror.NewUnavailable("test unavailable error"),
consts.ErrResourceExhaustedBusyWorkflow,
nil,
}
expectedInMemoryNoUserLatency := scheduleLatencies[0] + attemptLatencies[0] - userLatencies[0] +
scheduleLatencies[2] + attemptLatencies[2] - userLatencies[2]

// use a different set of latencies to test we are calculating the latency for the second attempt
scheduleLatency = 200 * time.Millisecond
userLatency = 300 * time.Millisecond
attemptLatency = 500 * time.Millisecond
executable := s.newTestExecutable()

executable.SetScheduledTime(now)
now = now.Add(scheduleLatency)
now := time.Now()
s.timeSource.Update(now)
executable.SetScheduledTime(now)

s.mockExecutor.EXPECT().Execute(gomock.Any(), executable).Do(func(ctx context.Context, taskInfo interface{}) {
metrics.ContextCounterAdd(
ctx,
metrics.HistoryWorkflowExecutionCacheLatency.GetMetricName(),
int64(userLatency),
)

now = now.Add(attemptLatency)
for i := 0; i != numAttempts; i++ {
now = now.Add(scheduleLatencies[i])
s.timeSource.Update(now)
}).Return(nil, true, nil)
err = executable.HandleErr(executable.Execute())
s.NoError(err)

s.Equal(scheduleLatency+attemptLatency-userLatency, executable.(*executableImpl).inMemoryNoUserLatency)
s.mockExecutor.EXPECT().Execute(gomock.Any(), executable).Do(func(ctx context.Context, taskInfo interface{}) {
metrics.ContextCounterAdd(
ctx,
metrics.HistoryWorkflowExecutionCacheLatency.GetMetricName(),
int64(userLatencies[i]),
)

now = now.Add(attemptLatencies[i])
s.timeSource.Update(now)
}).Return(nil, true, taskErrors[i])

err := executable.Execute()
if err != nil {
err = executable.HandleErr(err)
}

if taskErrors[i] != nil {
s.Error(err)
s.mockScheduler.EXPECT().TrySubmit(executable).Return(true)
executable.Nack(err)
} else {
s.NoError(err)
}
}

s.Equal(expectedInMemoryNoUserLatency, executable.(*executableImpl).inMemoryNoUserLatency)
}

func (s *executableSuite) TestExecute_CapturePanic() {
Expand Down

0 comments on commit ce47340

Please sign in to comment.