diff --git a/service/history/queues/executable.go b/service/history/queues/executable.go index a7bca679492..2f9e146174d 100644 --- a/service/history/queues/executable.go +++ b/service/history/queues/executable.go @@ -221,6 +221,12 @@ func (e *executableImpl) Execute() (retErr error) { priorityTaggedProvider := e.taggedMetricsHandler.WithTags(metrics.TaskPriorityTag(e.priority.String())) priorityTaggedProvider.Counter(metrics.TaskRequests.GetMetricName()).Record(1) priorityTaggedProvider.Timer(metrics.TaskScheduleLatency.GetMetricName()).Record(e.scheduleLatency) + + if retErr == nil { + e.inMemoryNoUserLatency += e.scheduleLatency + e.attemptNoUserLatency + } + // if retErr is not nil, HandleErr will take care of the inMemoryNoUserLatency calculation + // Not doing it here as for certain errors latency for the attempt should not be counted }() metricsTags, isActive, err := e.executor.Execute(ctx, e) @@ -237,6 +243,10 @@ func (e *executableImpl) Execute() (retErr error) { } func (e *executableImpl) HandleErr(err error) (retErr error) { + if err == nil { + return nil + } + defer func() { if !errors.Is(retErr, consts.ErrResourceExhaustedBusyWorkflow) { // if err is due to workflow busy, do not take any latency related to this attempt into account @@ -255,10 +265,6 @@ func (e *executableImpl) HandleErr(err error) (retErr error) { } }() - if err == nil { - return nil - } - var resourceExhaustedErr *serviceerror.ResourceExhausted if errors.As(err, &resourceExhaustedErr) { if resourceExhaustedErr.Cause != enums.RESOURCE_EXHAUSTED_CAUSE_BUSY_WORKFLOW { diff --git a/service/history/queues/executable_test.go b/service/history/queues/executable_test.go index 9257e3a2c5e..7ea1dbd099c 100644 --- a/service/history/queues/executable_test.go +++ b/service/history/queues/executable_test.go @@ -103,64 +103,138 @@ func (s *executableSuite) TestExecute_TaskExecuted() { s.NoError(executable.Execute()) } -func (s *executableSuite) TestExecute_InMemoryNoUserLatency() { - executable := s.newTestExecutable() - +func (s *executableSuite) TestExecute_InMemoryNoUserLatency_SingleAttempt() { scheduleLatency := 100 * time.Millisecond userLatency := 500 * time.Millisecond attemptLatency := time.Second + attemptNoUserLatency := scheduleLatency + attemptLatency - userLatency + + testCases := []struct { + taskErr error + expectError bool + expectedAttemptNoUserLatency time.Duration + expectBackoff bool + }{ + { + taskErr: nil, + expectError: false, + expectedAttemptNoUserLatency: attemptNoUserLatency, + expectBackoff: false, + }, + { + taskErr: serviceerror.NewUnavailable("some random error"), + expectError: true, + expectedAttemptNoUserLatency: attemptNoUserLatency, + expectBackoff: true, + }, + { + taskErr: serviceerror.NewNotFound("not found error"), + expectError: false, + expectedAttemptNoUserLatency: attemptNoUserLatency, + expectBackoff: false, + }, + { + taskErr: consts.ErrResourceExhaustedBusyWorkflow, + expectError: true, + expectedAttemptNoUserLatency: 0, + expectBackoff: false, + }, + } - now := time.Now() - s.timeSource.Update(now) - executable.SetScheduledTime(now) - - now = now.Add(scheduleLatency) - s.timeSource.Update(now) + for _, tc := range testCases { + executable := s.newTestExecutable() - s.mockExecutor.EXPECT().Execute(gomock.Any(), executable).Do(func(ctx context.Context, taskInfo interface{}) { - metrics.ContextCounterAdd( - ctx, - metrics.HistoryWorkflowExecutionCacheLatency.GetMetricName(), - int64(userLatency), - ) + now := time.Now() + s.timeSource.Update(now) + executable.SetScheduledTime(now) - now = now.Add(attemptLatency) + now = now.Add(scheduleLatency) s.timeSource.Update(now) - }).Return(nil, true, consts.ErrResourceExhaustedBusyWorkflow) - err := executable.HandleErr(executable.Execute()) - s.Equal(consts.ErrResourceExhaustedBusyWorkflow, err) - s.mockScheduler.EXPECT().TrySubmit(executable).Return(false) - s.mockRescheduler.EXPECT().Add(executable, gomock.Any()) - executable.Nack(err) + s.mockExecutor.EXPECT().Execute(gomock.Any(), executable).Do(func(ctx context.Context, taskInfo interface{}) { + metrics.ContextCounterAdd( + ctx, + metrics.HistoryWorkflowExecutionCacheLatency.GetMetricName(), + int64(userLatency), + ) + + now = now.Add(attemptLatency) + s.timeSource.Update(now) + }).Return(nil, true, tc.taskErr) + + err := executable.Execute() + if err != nil { + err = executable.HandleErr(err) + } + + if tc.expectError { + s.Error(err) + s.mockScheduler.EXPECT().TrySubmit(executable).Return(false) + s.mockRescheduler.EXPECT().Add(executable, gomock.Any()) + executable.Nack(err) + } else { + s.NoError(err) + } + + actualAttemptNoUserLatency := executable.(*executableImpl).inMemoryNoUserLatency + if tc.expectBackoff { + // the backoff duration is random, so we can't compare the exact value + s.Less(tc.expectedAttemptNoUserLatency, actualAttemptNoUserLatency) + } else { + s.Equal(tc.expectedAttemptNoUserLatency, actualAttemptNoUserLatency) + } + } +} - // backoff duration - now = now.Add(time.Second) - s.timeSource.Update(now) +func (s *executableSuite) TestExecute_InMemoryNoUserLatency_MultipleAttempts() { + numAttempts := 3 + scheduleLatencies := []time.Duration{100 * time.Millisecond, 150 * time.Millisecond, 200 * time.Millisecond} + userLatencies := []time.Duration{10 * time.Millisecond, 20 * time.Millisecond, 30 * time.Millisecond} + attemptLatencies := []time.Duration{time.Second, 2 * time.Second, 3 * time.Second} + taskErrors := []error{ + serviceerror.NewUnavailable("test unavailable error"), + consts.ErrResourceExhaustedBusyWorkflow, + nil, + } + expectedInMemoryNoUserLatency := scheduleLatencies[0] + attemptLatencies[0] - userLatencies[0] + + scheduleLatencies[2] + attemptLatencies[2] - userLatencies[2] - // use a different set of latencies to test we are calculating the latency for the second attempt - scheduleLatency = 200 * time.Millisecond - userLatency = 300 * time.Millisecond - attemptLatency = 500 * time.Millisecond + executable := s.newTestExecutable() - executable.SetScheduledTime(now) - now = now.Add(scheduleLatency) + now := time.Now() s.timeSource.Update(now) + executable.SetScheduledTime(now) - s.mockExecutor.EXPECT().Execute(gomock.Any(), executable).Do(func(ctx context.Context, taskInfo interface{}) { - metrics.ContextCounterAdd( - ctx, - metrics.HistoryWorkflowExecutionCacheLatency.GetMetricName(), - int64(userLatency), - ) - - now = now.Add(attemptLatency) + for i := 0; i != numAttempts; i++ { + now = now.Add(scheduleLatencies[i]) s.timeSource.Update(now) - }).Return(nil, true, nil) - err = executable.HandleErr(executable.Execute()) - s.NoError(err) - s.Equal(scheduleLatency+attemptLatency-userLatency, executable.(*executableImpl).inMemoryNoUserLatency) + s.mockExecutor.EXPECT().Execute(gomock.Any(), executable).Do(func(ctx context.Context, taskInfo interface{}) { + metrics.ContextCounterAdd( + ctx, + metrics.HistoryWorkflowExecutionCacheLatency.GetMetricName(), + int64(userLatencies[i]), + ) + + now = now.Add(attemptLatencies[i]) + s.timeSource.Update(now) + }).Return(nil, true, taskErrors[i]) + + err := executable.Execute() + if err != nil { + err = executable.HandleErr(err) + } + + if taskErrors[i] != nil { + s.Error(err) + s.mockScheduler.EXPECT().TrySubmit(executable).Return(true) + executable.Nack(err) + } else { + s.NoError(err) + } + } + + s.Equal(expectedInMemoryNoUserLatency, executable.(*executableImpl).inMemoryNoUserLatency) } func (s *executableSuite) TestExecute_CapturePanic() {