From 35dcba861fa548152d7edeb437773bed68cacbe5 Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Mon, 27 Feb 2023 14:09:51 -0800 Subject: [PATCH] Improve scheduled queue lookahead logic (#3982) --- service/history/queues/queue_base.go | 12 +++--- service/history/queues/queue_base_test.go | 11 +++++- service/history/queues/queue_immediate.go | 10 ++--- service/history/queues/queue_scheduled.go | 37 +++++++++--------- .../history/queues/queue_scheduled_test.go | 13 ------- service/history/queues/reader.go | 18 +++++++++ service/history/queues/reader_test.go | 38 ++++++++++++------- 7 files changed, 79 insertions(+), 60 deletions(-) diff --git a/service/history/queues/queue_base.go b/service/history/queues/queue_base.go index 8a847718b14..55864847ef0 100644 --- a/service/history/queues/queue_base.go +++ b/service/history/queues/queue_base.go @@ -130,6 +130,7 @@ func newQueueBase( executor Executor, options *Options, hostReaderRateLimiter quotas.RequestRateLimiter, + completionFn ReaderCompletionFn, logger log.Logger, metricsHandler metrics.Handler, ) *queueBase { @@ -196,6 +197,7 @@ func newQueueBase( timeSource, readerRateLimiter, monitor, + completionFn, logger, metricsHandler, ) @@ -281,7 +283,7 @@ func (p *queueBase) FailoverNamespace( p.rescheduler.Reschedule(namespaceID) } -func (p *queueBase) processNewRange() error { +func (p *queueBase) processNewRange() { var newMaxKey tasks.Key switch categoryType := p.category.Type(); categoryType { case tasks.CategoryTypeImmediate: @@ -289,14 +291,15 @@ func (p *queueBase) processNewRange() error { case tasks.CategoryTypeScheduled: var err error if newMaxKey, err = p.shard.UpdateScheduledQueueExclusiveHighReadWatermark(); err != nil { - return err + p.logger.Error("Unable to process new range", tag.Error(err)) + return } default: panic(fmt.Sprintf("Unknown task category type: %v", categoryType.String())) } if !p.nonReadableScope.CanSplitByRange(newMaxKey) { - return nil + return } var newReadScope Scope @@ -311,7 +314,7 @@ func (p *queueBase) processNewRange() error { reader, ok := p.readerGroup.ReaderByID(DefaultReaderId) if !ok { p.readerGroup.NewReader(DefaultReaderId, newSlice) - return nil + return } if now := p.timeSource.Now(); now.After(p.nextForceNewSliceTime) { @@ -320,7 +323,6 @@ func (p *queueBase) processNewRange() error { } else { reader.MergeSlices(newSlice) } - return nil } func (p *queueBase) checkpoint() { diff --git a/service/history/queues/queue_base_test.go b/service/history/queues/queue_base_test.go index 23f9c1b0952..68e775e903a 100644 --- a/service/history/queues/queue_base_test.go +++ b/service/history/queues/queue_base_test.go @@ -142,6 +142,7 @@ func (s *queueBaseSuite) TestNewProcessBase_NoPreviousState() { nil, s.options, s.rateLimiter, + NoopReaderCompletionFn, s.logger, s.metricsHandler, ) @@ -225,6 +226,7 @@ func (s *queueBaseSuite) TestNewProcessBase_WithPreviousState() { nil, s.options, s.rateLimiter, + NoopReaderCompletionFn, s.logger, s.metricsHandler, ) @@ -284,13 +286,14 @@ func (s *queueBaseSuite) TestStartStop() { nil, s.options, s.rateLimiter, + NoopReaderCompletionFn, s.logger, s.metricsHandler, ) s.mockRescheduler.EXPECT().Start().Times(1) base.Start() - s.NoError(base.processNewRange()) + base.processNewRange() <-doneCh <-base.checkpointTimer.C @@ -333,12 +336,13 @@ func (s *queueBaseSuite) TestProcessNewRange() { nil, s.options, s.rateLimiter, + NoopReaderCompletionFn, s.logger, s.metricsHandler, ) s.True(base.nonReadableScope.Range.Equals(NewRange(tasks.MinimumKey, tasks.MaximumKey))) - s.NoError(base.processNewRange()) + base.processNewRange() defaultReader, ok := base.readerGroup.ReaderByID(DefaultReaderId) s.True(ok) scopes := defaultReader.Scopes() @@ -389,6 +393,7 @@ func (s *queueBaseSuite) TestCheckPoint_WithPendingTasks() { nil, s.options, s.rateLimiter, + NoopReaderCompletionFn, s.logger, s.metricsHandler, ) @@ -461,6 +466,7 @@ func (s *queueBaseSuite) TestCheckPoint_NoPendingTasks() { nil, s.options, s.rateLimiter, + NoopReaderCompletionFn, s.logger, s.metricsHandler, ) @@ -548,6 +554,7 @@ func (s *queueBaseSuite) TestCheckPoint_MoveSlices() { nil, s.options, s.rateLimiter, + NoopReaderCompletionFn, s.logger, s.metricsHandler, ) diff --git a/service/history/queues/queue_immediate.go b/service/history/queues/queue_immediate.go index 003b13ca3ad..538a0e5c7c8 100644 --- a/service/history/queues/queue_immediate.go +++ b/service/history/queues/queue_immediate.go @@ -96,6 +96,7 @@ func NewImmediateQueue( executor, options, hostRateLimiter, + NoopReaderCompletionFn, logger, metricsHandler, ), @@ -159,9 +160,7 @@ func (p *immediateQueue) processEventLoop() { case <-p.shutdownCh: return case <-p.notifyCh: - if err := p.processNewRange(); err != nil { - p.logger.Error("Unable to process new range", tag.Error(err)) - } + p.processNewRange() case <-pollTimer.C: p.processPollTimer(pollTimer) case <-p.checkpointTimer.C: @@ -173,10 +172,7 @@ func (p *immediateQueue) processEventLoop() { } func (p *immediateQueue) processPollTimer(pollTimer *time.Timer) { - if err := p.processNewRange(); err != nil { - p.logger.Error("Unable to process new range", tag.Error(err)) - } - + p.processNewRange() pollTimer.Reset(backoff.Jitter( p.options.MaxPollInterval(), p.options.MaxPollIntervalJitterCoefficient(), diff --git a/service/history/queues/queue_scheduled.go b/service/history/queues/queue_scheduled.go index 67bfe40a45c..40bbe462c2e 100644 --- a/service/history/queues/queue_scheduled.go +++ b/service/history/queues/queue_scheduled.go @@ -54,6 +54,7 @@ type ( newTimeLock sync.Mutex newTime time.Time + lookAheadCh chan struct{} lookAheadRateLimitRequest quotas.Request } ) @@ -106,6 +107,18 @@ func NewScheduledQueue( } } + lookAheadCh := make(chan struct{}, 1) + readerCompletionFn := func(readerID int32) { + if readerID != DefaultReaderId { + return + } + + select { + case lookAheadCh <- struct{}{}: + default: + } + } + return &scheduledQueue{ queueBase: newQueueBase( shard, @@ -117,6 +130,7 @@ func NewScheduledQueue( executor, options, hostRateLimiter, + readerCompletionFn, logger, metricsHandler, ), @@ -124,6 +138,7 @@ func NewScheduledQueue( timerGate: timer.NewLocalGate(shard.GetTimeSource()), newTimerCh: make(chan struct{}, 1), + lookAheadCh: lookAheadCh, lookAheadRateLimitRequest: newReaderRequest(DefaultReaderId), } } @@ -188,6 +203,8 @@ func (p *scheduledQueue) processEventLoop() { case <-p.newTimerCh: p.metricsHandler.Counter(metrics.NewTimerNotifyCounter.GetMetricName()).Record(1) p.processNewTime() + case <-p.lookAheadCh: + p.lookAheadTask() case <-p.timerGate.FireChan(): p.processNewRange() case <-p.checkpointTimer.C: @@ -222,26 +239,6 @@ func (p *scheduledQueue) processNewTime() { p.timerGate.Update(newTime) } -func (p *scheduledQueue) processNewRange() { - if err := p.queueBase.processNewRange(); err != nil { - // This only happens when shard state is invalid, - // in which case no look ahead is needed. - // Notification will be sent when shard is reacquired, but - // still set a max poll timer here as a catch all case. - p.timerGate.Update(p.timeSource.Now().Add(backoff.Jitter( - p.options.MaxPollInterval(), - p.options.MaxPollIntervalJitterCoefficient(), - ))) - return - } - - // Only do look ahead when shard state is valid. - // When shard is invalid, even look ahead task is found, - // it can't be loaded as scheduled queue max read level can't move - // forward. - p.lookAheadTask() -} - func (p *scheduledQueue) lookAheadTask() { rateLimitCtx, rateLimitCancel := context.WithTimeout(context.Background(), lookAheadRateLimitDelay) rateLimitErr := p.readerRateLimiter.Wait(rateLimitCtx, p.lookAheadRateLimitRequest) diff --git a/service/history/queues/queue_scheduled_test.go b/service/history/queues/queue_scheduled_test.go index 3b411f7d267..ab1726f2d3a 100644 --- a/service/history/queues/queue_scheduled_test.go +++ b/service/history/queues/queue_scheduled_test.go @@ -245,19 +245,6 @@ func (s *scheduledQueueSuite) TestLookAheadTask_ErrorLookAhead() { } } -func (s *scheduledQueueSuite) TestProcessNewRange_LookAheadPerformed() { - timerGate := timer.NewRemoteGate() - s.scheduledQueue.timerGate = timerGate - - // test if look ahead if performed after processing new range - s.mockExecutionManager.EXPECT().GetHistoryTasks(gomock.Any(), gomock.Any()).Return(&persistence.GetHistoryTasksResponse{ - Tasks: []tasks.Task{}, - NextPageToken: nil, - }, nil).Times(1) - - s.scheduledQueue.processNewRange() -} - func (s *scheduledQueueSuite) setupLookAheadMock( hasLookAheadTask bool, ) (lookAheadRange Range, lookAheadTask *tasks.MockTask) { diff --git a/service/history/queues/reader.go b/service/history/queues/reader.go index aaf4d9217f9..35361712e17 100644 --- a/service/history/queues/reader.go +++ b/service/history/queues/reader.go @@ -76,6 +76,8 @@ type ( SlicePredicate func(s Slice) bool + ReaderCompletionFn func(readerID int32) + ReaderImpl struct { sync.Mutex @@ -86,6 +88,7 @@ type ( timeSource clock.TimeSource ratelimiter quotas.RequestRateLimiter monitor Monitor + completionFn ReaderCompletionFn logger log.Logger metricsHandler metrics.Handler @@ -106,6 +109,10 @@ type ( } ) +var ( + NoopReaderCompletionFn = func(_ int32) {} +) + func NewReader( readerID int32, slices []Slice, @@ -115,6 +122,7 @@ func NewReader( timeSource clock.TimeSource, ratelimiter quotas.RequestRateLimiter, monitor Monitor, + completionFn ReaderCompletionFn, logger log.Logger, metricsHandler metrics.Handler, ) *ReaderImpl { @@ -134,6 +142,7 @@ func NewReader( timeSource: timeSource, ratelimiter: ratelimiter, monitor: monitor, + completionFn: completionFn, logger: log.With(logger, tag.QueueReaderID(readerID)), metricsHandler: metricsHandler, @@ -427,6 +436,7 @@ func (r *ReaderImpl) loadAndSubmitTasks() { } if r.nextReadSlice == nil { + r.completionFn(r.readerID) return } @@ -457,7 +467,11 @@ func (r *ReaderImpl) loadAndSubmitTasks() { if r.nextReadSlice = r.nextReadSlice.Next(); r.nextReadSlice != nil { r.notify() + return } + + // no more task to load, trigger completion callback + r.completionFn(r.readerID) } func (r *ReaderImpl) resetNextReadSliceLocked() { @@ -471,7 +485,11 @@ func (r *ReaderImpl) resetNextReadSliceLocked() { if r.nextReadSlice != nil { r.notify() + return } + + // no more task to load, trigger completion callback + r.completionFn(r.readerID) } func (r *ReaderImpl) notify() { diff --git a/service/history/queues/reader_test.go b/service/history/queues/reader_test.go index bdc6fd6d144..2933f34a90a 100644 --- a/service/history/queues/reader_test.go +++ b/service/history/queues/reader_test.go @@ -111,7 +111,7 @@ func (s *readerSuite) TestStartLoadStop() { }).Times(1) s.mockRescheduler.EXPECT().Len().Return(0).AnyTimes() - reader := s.newTestReader(scopes, paginationFnProvider) + reader := s.newTestReader(scopes, paginationFnProvider, NoopReaderCompletionFn) mockTimeSource := clock.NewEventTimeSource() mockTimeSource.Update(scopes[0].Range.ExclusiveMax.FireTime) reader.timeSource = mockTimeSource @@ -124,7 +124,7 @@ func (s *readerSuite) TestStartLoadStop() { func (s *readerSuite) TestScopes() { scopes := NewRandomScopes(10) - reader := s.newTestReader(scopes, nil) + reader := s.newTestReader(scopes, nil, NoopReaderCompletionFn) actualScopes := reader.Scopes() for idx, expectedScope := range scopes { s.True(expectedScope.Equals(actualScopes[idx])) @@ -133,7 +133,7 @@ func (s *readerSuite) TestScopes() { func (s *readerSuite) TestSplitSlices() { scopes := NewRandomScopes(3) - reader := s.newTestReader(scopes, nil) + reader := s.newTestReader(scopes, nil, NoopReaderCompletionFn) splitter := func(s Slice) ([]Slice, bool) { // split head @@ -174,7 +174,7 @@ func (s *readerSuite) TestSplitSlices() { func (s *readerSuite) TestMergeSlices() { scopes := NewRandomScopes(rand.Intn(10)) - reader := s.newTestReader(scopes, nil) + reader := s.newTestReader(scopes, nil, NoopReaderCompletionFn) incomingScopes := NewRandomScopes(rand.Intn(10)) incomingSlices := make([]Slice, 0, len(incomingScopes)) @@ -201,7 +201,7 @@ func (s *readerSuite) TestAppendSlices() { totalScopes := 10 scopes := NewRandomScopes(totalScopes) currentScopes := scopes[:totalScopes/2] - reader := s.newTestReader(currentScopes, nil) + reader := s.newTestReader(currentScopes, nil, NoopReaderCompletionFn) incomingScopes := scopes[totalScopes/2:] incomingSlices := make([]Slice, 0, len(incomingScopes)) @@ -235,7 +235,7 @@ func (s *readerSuite) TestShrinkSlices() { scopes[idx].Range.InclusiveMin = scopes[idx].Range.ExclusiveMax } - reader := s.newTestReader(scopes, nil) + reader := s.newTestReader(scopes, nil, NoopReaderCompletionFn) reader.ShrinkSlices() actualScopes := reader.Scopes() @@ -265,7 +265,7 @@ func (s *readerSuite) TestThrottle() { } } - reader := s.newTestReader(scopes, paginationFnProvider) + reader := s.newTestReader(scopes, paginationFnProvider, NoopReaderCompletionFn) mockTimeSource := clock.NewEventTimeSource() mockTimeSource.Update(scopes[0].Range.ExclusiveMax.FireTime) reader.timeSource = mockTimeSource @@ -293,19 +293,22 @@ func (s *readerSuite) TestThrottle() { func (s *readerSuite) TestLoadAndSubmitTasks_Throttled() { scopes := NewRandomScopes(1) - reader := s.newTestReader(scopes, nil) + completionFnCalled := false + reader := s.newTestReader(scopes, nil, func(_ int32) { completionFnCalled = true }) reader.Pause(100 * time.Millisecond) s.mockRescheduler.EXPECT().Len().Return(0).AnyTimes() // should be no-op reader.loadAndSubmitTasks() + s.False(completionFnCalled) } func (s *readerSuite) TestLoadAndSubmitTasks_TooManyPendingTasks() { scopes := NewRandomScopes(1) - reader := s.newTestReader(scopes, nil) + completionFnCalled := false + reader := s.newTestReader(scopes, nil, func(_ int32) { completionFnCalled = true }) s.monitor.SetSlicePendingTaskCount( reader.slices.Front().Value.(Slice), @@ -314,6 +317,7 @@ func (s *readerSuite) TestLoadAndSubmitTasks_TooManyPendingTasks() { // should be no-op reader.loadAndSubmitTasks() + s.False(completionFnCalled) } func (s *readerSuite) TestLoadAndSubmitTasks_MoreTasks() { @@ -333,7 +337,8 @@ func (s *readerSuite) TestLoadAndSubmitTasks_MoreTasks() { } } - reader := s.newTestReader(scopes, paginationFnProvider) + completionFnCalled := false + reader := s.newTestReader(scopes, paginationFnProvider, func(_ int32) { completionFnCalled = true }) mockTimeSource := clock.NewEventTimeSource() mockTimeSource.Update(scopes[0].Range.ExclusiveMax.FireTime) reader.timeSource = mockTimeSource @@ -349,6 +354,7 @@ func (s *readerSuite) TestLoadAndSubmitTasks_MoreTasks() { <-reader.notifyCh // should trigger next round of load s.Equal(reader.options.BatchSize(), taskSubmitted) s.True(scopes[0].Equals(reader.nextReadSlice.Value.(Slice).Scope())) + s.False(completionFnCalled) } func (s *readerSuite) TestLoadAndSubmitTasks_NoMoreTasks_HasNextSlice() { @@ -363,7 +369,8 @@ func (s *readerSuite) TestLoadAndSubmitTasks_NoMoreTasks_HasNextSlice() { } } - reader := s.newTestReader(scopes, paginationFnProvider) + completionFnCalled := false + reader := s.newTestReader(scopes, paginationFnProvider, func(_ int32) { completionFnCalled = true }) mockTimeSource := clock.NewEventTimeSource() mockTimeSource.Update(scopes[0].Range.ExclusiveMax.FireTime) reader.timeSource = mockTimeSource @@ -379,6 +386,7 @@ func (s *readerSuite) TestLoadAndSubmitTasks_NoMoreTasks_HasNextSlice() { <-reader.notifyCh // should trigger next round of load s.Equal(1, taskSubmitted) s.True(scopes[1].Equals(reader.nextReadSlice.Value.(Slice).Scope())) + s.False(completionFnCalled) } func (s *readerSuite) TestLoadAndSubmitTasks_NoMoreTasks_NoNextSlice() { @@ -393,7 +401,8 @@ func (s *readerSuite) TestLoadAndSubmitTasks_NoMoreTasks_NoNextSlice() { } } - reader := s.newTestReader(scopes, paginationFnProvider) + completionFnCalled := false + reader := s.newTestReader(scopes, paginationFnProvider, func(_ int32) { completionFnCalled = true }) mockTimeSource := clock.NewEventTimeSource() mockTimeSource.Update(scopes[0].Range.ExclusiveMax.FireTime) reader.timeSource = mockTimeSource @@ -414,12 +423,13 @@ func (s *readerSuite) TestLoadAndSubmitTasks_NoMoreTasks_NoNextSlice() { } s.Equal(1, taskSubmitted) s.Nil(reader.nextReadSlice) + s.True(completionFnCalled) } func (s *readerSuite) TestSubmitTask() { r := NewRandomRange() scopes := []Scope{NewScope(r, predicates.Universal[tasks.Task]())} - reader := s.newTestReader(scopes, nil) + reader := s.newTestReader(scopes, nil, NoopReaderCompletionFn) mockExecutable := NewMockExecutable(s.controller) @@ -457,6 +467,7 @@ func (s *readerSuite) validateSlicesOrdered( func (s *readerSuite) newTestReader( scopes []Scope, paginationFnProvider PaginationFnProvider, + completionFn ReaderCompletionFn, ) *ReaderImpl { slices := make([]Slice, 0, len(scopes)) for _, scope := range scopes { @@ -477,6 +488,7 @@ func (s *readerSuite) newTestReader( clock.NewRealTimeSource(), NewReaderPriorityRateLimiter(func() float64 { return 20 }, 1), s.monitor, + completionFn, s.logger, s.metricsHandler, )