diff --git a/common/service/dynamicconfig/constants.go b/common/service/dynamicconfig/constants.go index 6d0b83cd300..0e0e11266d0 100644 --- a/common/service/dynamicconfig/constants.go +++ b/common/service/dynamicconfig/constants.go @@ -82,6 +82,7 @@ var keys = map[Key]string{ TimerProcessorUpdateShardTaskCount: "history.timerProcessorUpdateShardTaskCount", TimerProcessorUpdateAckInterval: "history.timerProcessorUpdateAckInterval", TimerProcessorCompleteTimerInterval: "history.timerProcessorCompleteTimerInterval", + TimerProcessorMaxPollRPS: "history.timerProcessorMaxPollRPS", TimerProcessorMaxPollInterval: "history.timerProcessorMaxPollInterval", TimerProcessorMaxPollIntervalJitterCoefficient: "history.timerProcessorMaxPollIntervalJitterCoefficient", TimerProcessorStandbyTaskDelay: "history.timerProcessorStandbyTaskDelay", @@ -190,6 +191,8 @@ const ( TimerProcessorUpdateAckInterval // TimerProcessorCompleteTimerInterval is complete timer interval for timer processor TimerProcessorCompleteTimerInterval + // TimerProcessorMaxPollRPS is max poll rate per second for timer processor + TimerProcessorMaxPollRPS // TimerProcessorMaxPollInterval is max poll interval for timer processor TimerProcessorMaxPollInterval // TimerProcessorMaxPollIntervalJitterCoefficient is the max poll interval jitter coefficient diff --git a/service/history/historyReplicator.go b/service/history/historyReplicator.go index 894a48a175b..60110c59f2d 100644 --- a/service/history/historyReplicator.go +++ b/service/history/historyReplicator.go @@ -194,7 +194,7 @@ func (r *historyReplicator) ApplyStartEvent(context *workflowExecutionContext, r msBuilder := r.getNewMutableState(request.GetVersion(), logger) err := r.ApplyReplicationTask(context, msBuilder, request, logger) if err != nil { - logger.Errorf("Fail to Apply Replication task. NextEvent: %v, FirstEvent: %v, Err: %v", msBuilder.GetNextEventID(), + logger.Debugf("Fail to Apply Replication task. NextEvent: %v, FirstEvent: %v, Err: %v", msBuilder.GetNextEventID(), request.GetFirstEventId(), err) } return err diff --git a/service/history/queueProcessor.go b/service/history/queueProcessor.go index b1c2fb625ee..0e29a0b888a 100644 --- a/service/history/queueProcessor.go +++ b/service/history/queueProcessor.go @@ -63,6 +63,8 @@ type ( // worker coroutines notification workerNotificationChans []chan struct{} + lastPollTime time.Time + notifyCh chan struct{} status int32 shutdownWG sync.WaitGroup @@ -92,6 +94,7 @@ func newQueueProcessorBase(shard ShardContext, options *QueueProcessorOptions, p metricsClient: shard.GetMetricsClient(), logger: logger, ackMgr: queueAckMgr, + lastPollTime: time.Time{}, } return p @@ -145,7 +148,6 @@ func (p *queueProcessorBase) processorPump() { } jitter := backoff.NewJitter() - lastPollTime := time.Time{} pollTimer := time.NewTimer(jitter.JitDuration(p.options.MaxPollInterval(), p.options.MaxPollIntervalJitterCoefficient())) updateAckTimer := time.NewTimer(p.options.UpdateAckInterval()) @@ -159,12 +161,10 @@ processorPumpLoop: go p.Stop() case <-p.notifyCh: p.processBatch(tasksCh) - lastPollTime = time.Now() case <-pollTimer.C: pollTimer.Reset(jitter.JitDuration(p.options.MaxPollInterval(), p.options.MaxPollIntervalJitterCoefficient())) - if lastPollTime.Add(p.options.MaxPollInterval()).Before(time.Now()) { + if p.lastPollTime.Add(p.options.MaxPollInterval()).Before(time.Now()) { p.processBatch(tasksCh) - lastPollTime = time.Now() } case <-updateAckTimer.C: p.ackMgr.updateQueueAckLevel() @@ -189,6 +189,7 @@ func (p *queueProcessorBase) processBatch(tasksCh chan<- queueTaskInfo) { return } + p.lastPollTime = time.Now() tasks, more, err := p.ackMgr.readQueueTasks() if err != nil { diff --git a/service/history/service.go b/service/history/service.go index fcab80ba3bc..dd975de8e56 100644 --- a/service/history/service.go +++ b/service/history/service.go @@ -55,16 +55,17 @@ type Config struct { TimerProcessorCompleteTimerFailureRetryCount dynamicconfig.IntPropertyFn TimerProcessorUpdateAckInterval dynamicconfig.DurationPropertyFn TimerProcessorCompleteTimerInterval dynamicconfig.DurationPropertyFn + TimerProcessorMaxPollRPS dynamicconfig.IntPropertyFn TimerProcessorMaxPollInterval dynamicconfig.DurationPropertyFn TimerProcessorMaxPollIntervalJitterCoefficient dynamicconfig.FloatPropertyFn TimerProcessorStandbyTaskDelay dynamicconfig.DurationPropertyFn // TransferQueueProcessor settings TransferTaskBatchSize dynamicconfig.IntPropertyFn - TransferProcessorMaxPollRPS dynamicconfig.IntPropertyFn TransferTaskWorkerCount dynamicconfig.IntPropertyFn TransferTaskMaxRetryCount dynamicconfig.IntPropertyFn TransferProcessorCompleteTransferFailureRetryCount dynamicconfig.IntPropertyFn + TransferProcessorMaxPollRPS dynamicconfig.IntPropertyFn TransferProcessorMaxPollInterval dynamicconfig.DurationPropertyFn TransferProcessorMaxPollIntervalJitterCoefficient dynamicconfig.FloatPropertyFn TransferProcessorUpdateAckInterval dynamicconfig.DurationPropertyFn @@ -111,11 +112,12 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int) *Config { TimerProcessorCompleteTimerFailureRetryCount: dc.GetIntProperty(dynamicconfig.TimerProcessorCompleteTimerFailureRetryCount, 10), TimerProcessorUpdateAckInterval: dc.GetDurationProperty(dynamicconfig.TimerProcessorUpdateAckInterval, 5*time.Second), TimerProcessorCompleteTimerInterval: dc.GetDurationProperty(dynamicconfig.TimerProcessorCompleteTimerInterval, 3*time.Second), + TimerProcessorMaxPollRPS: dc.GetIntProperty(dynamicconfig.TimerProcessorMaxPollRPS, 20), TimerProcessorMaxPollInterval: dc.GetDurationProperty(dynamicconfig.TimerProcessorMaxPollInterval, 5*time.Minute), TimerProcessorMaxPollIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.TimerProcessorMaxPollIntervalJitterCoefficient, 0.15), TimerProcessorStandbyTaskDelay: dc.GetDurationProperty(dynamicconfig.TimerProcessorStandbyTaskDelay, 0*time.Minute), TransferTaskBatchSize: dc.GetIntProperty(dynamicconfig.TransferTaskBatchSize, 100), - TransferProcessorMaxPollRPS: dc.GetIntProperty(dynamicconfig.TransferProcessorMaxPollRPS, 100), + TransferProcessorMaxPollRPS: dc.GetIntProperty(dynamicconfig.TransferProcessorMaxPollRPS, 20), TransferTaskWorkerCount: dc.GetIntProperty(dynamicconfig.TransferTaskWorkerCount, 10), TransferTaskMaxRetryCount: dc.GetIntProperty(dynamicconfig.TransferTaskMaxRetryCount, 100), TransferProcessorCompleteTransferFailureRetryCount: dc.GetIntProperty(dynamicconfig.TransferProcessorCompleteTransferFailureRetryCount, 10), @@ -127,7 +129,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int) *Config { ReplicatorTaskBatchSize: dc.GetIntProperty(dynamicconfig.ReplicatorTaskBatchSize, 100), ReplicatorTaskWorkerCount: dc.GetIntProperty(dynamicconfig.ReplicatorTaskWorkerCount, 10), ReplicatorTaskMaxRetryCount: dc.GetIntProperty(dynamicconfig.ReplicatorTaskMaxRetryCount, 100), - ReplicatorProcessorMaxPollRPS: dc.GetIntProperty(dynamicconfig.ReplicatorProcessorMaxPollRPS, 100), + ReplicatorProcessorMaxPollRPS: dc.GetIntProperty(dynamicconfig.ReplicatorProcessorMaxPollRPS, 20), ReplicatorProcessorMaxPollInterval: dc.GetDurationProperty(dynamicconfig.ReplicatorProcessorMaxPollInterval, 1*time.Minute), ReplicatorProcessorMaxPollIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.ReplicatorProcessorMaxPollIntervalJitterCoefficient, 0.15), ReplicatorProcessorUpdateAckInterval: dc.GetDurationProperty(dynamicconfig.ReplicatorProcessorUpdateAckInterval, 5*time.Second), diff --git a/service/history/timerGate.go b/service/history/timerGate.go index 011e307998a..fc28a1cae29 100644 --- a/service/history/timerGate.go +++ b/service/history/timerGate.go @@ -76,10 +76,10 @@ type ( // lock for timer and next wake up time sync.Mutex + // view of current time + currentTime time.Time // time which timer will fire nextWakeupTime time.Time - // nextWakeupTime already fired or not - nextWakeupTimeFired bool } ) @@ -155,9 +155,9 @@ func (timerGate *LocalTimerGateImpl) Close() { // NewRemoteTimerGate create a new timer gate instance func NewRemoteTimerGate() RemoteTimerGate { timer := &RemoteTimerGateImpl{ - nextWakeupTime: time.Time{}, - nextWakeupTimeFired: true, // this should be true so update API can set the next timer - fireChan: make(chan struct{}, 1), + currentTime: time.Time{}, + nextWakeupTime: time.Time{}, + fireChan: make(chan struct{}, 1), } return timer } @@ -171,7 +171,9 @@ func (timerGate *RemoteTimerGateImpl) FireChan() <-chan struct{} { func (timerGate *RemoteTimerGateImpl) FireAfter(now time.Time) bool { timerGate.Lock() defer timerGate.Unlock() - return timerGate.nextWakeupTime.After(now) + + active := timerGate.currentTime.Before(timerGate.nextWakeupTime) + return active && timerGate.nextWakeupTime.After(now) } // Update update the timer gate, return true if update is a success @@ -180,33 +182,69 @@ func (timerGate *RemoteTimerGateImpl) Update(nextTime time.Time) bool { timerGate.Lock() defer timerGate.Unlock() - if timerGate.nextWakeupTimeFired || timerGate.nextWakeupTime.After(nextTime) { - // if timer will not fire or next wake time is after the "next" - // then we need to update the timer to fire + active := timerGate.currentTime.Before(timerGate.nextWakeupTime) + if active { + if timerGate.nextWakeupTime.Before(nextTime) { + // current time < next wake up time < next time + return false + } + + if timerGate.currentTime.Before(nextTime) { + // current time < next time <= next wake up time + timerGate.nextWakeupTime = nextTime + return true + } + + // next time <= current time < next wake up time timerGate.nextWakeupTime = nextTime - timerGate.nextWakeupTimeFired = false - // Notifies caller that next notification is reset to fire at passed in 'next' visibility time + timerGate.fire() return true } - return false + // this means the timer, before stopped, has already fired / never active + if !timerGate.currentTime.Before(nextTime) { + // next time is <= current time, need to fire immediately + // whether to update next wake up time or not is irrelevent + timerGate.fire() + } else { + // next time > current time + timerGate.nextWakeupTime = nextTime + } + return true } // SetCurrentTime set the current time, and additionally fire the fire chan // if new "current" time is after the next wake up time, return true if -// "current" is antually updated +// "current" is actually updated func (timerGate *RemoteTimerGateImpl) SetCurrentTime(currentTime time.Time) bool { timerGate.Lock() defer timerGate.Unlock() - if !timerGate.nextWakeupTimeFired && !currentTime.Before(timerGate.nextWakeupTime) { - timerGate.nextWakeupTimeFired = true - select { - case timerGate.fireChan <- struct{}{}: - // timer successfully triggered - default: - // timer already triggered, pass - } + + if !timerGate.currentTime.Before(currentTime) { + // new current time is <= current time + return false + } + + // NOTE: do not update the current time now + if !timerGate.currentTime.Before(timerGate.nextWakeupTime) { + // current time already >= next wakeup time + // avoid duplicate fire + timerGate.currentTime = currentTime return true } - return false + + timerGate.currentTime = currentTime + if !timerGate.currentTime.Before(timerGate.nextWakeupTime) { + timerGate.fire() + } + return true +} + +func (timerGate *RemoteTimerGateImpl) fire() { + select { + case timerGate.fireChan <- struct{}{}: + // timer successfully triggered + default: + // timer already triggered, pass + } } diff --git a/service/history/timerGate_test.go b/service/history/timerGate_test.go index ea0819eda58..622490feb92 100644 --- a/service/history/timerGate_test.go +++ b/service/history/timerGate_test.go @@ -91,6 +91,7 @@ func (s *remoteTimerGateSuite) TearDownSuite() { func (s *remoteTimerGateSuite) SetupTest() { s.currentTime = time.Now().Add(-10 * time.Minute) s.remoteTimerGate = NewRemoteTimerGate() + s.remoteTimerGate.SetCurrentTime(s.currentTime) } func (s *remoteTimerGateSuite) TearDownTest() { @@ -99,70 +100,180 @@ func (s *remoteTimerGateSuite) TearDownTest() { func (s *localTimerGateSuite) TestTimerFire() { now := time.Now() - timerDelay := now.Add(1 * time.Second) - deadlineDelay := now.Add(2 * time.Second) - s.localTimerGate.Update(timerDelay) + newTimer := now.Add(1 * time.Second) + deadline := now.Add(2 * time.Second) + s.localTimerGate.Update(newTimer) select { case <-s.localTimerGate.FireChan(): - case <-time.NewTimer(deadlineDelay.Sub(now)).C: + case <-time.NewTimer(deadline.Sub(now)).C: s.Fail("timer should fire before test deadline") } } -func (s *localTimerGateSuite) TestTimerFireAfterUpdate_Updated() { +func (s *localTimerGateSuite) TestTimerFireAfterUpdate_Active_Updated_BeforeNow() { now := time.Now() - timerDelay := now.Add(5 * time.Second) - updatedTimerDelay := now.Add(1 * time.Second) - deadlineDelay := now.Add(3 * time.Second) - s.localTimerGate.Update(timerDelay) - s.True(s.localTimerGate.Update(updatedTimerDelay)) + newTimer := now.Add(9 * time.Second) + updatedNewTimer := now.Add(-1 * time.Second) + deadline := now.Add(3 * time.Second) + s.localTimerGate.Update(newTimer) select { case <-s.localTimerGate.FireChan(): - case <-time.NewTimer(deadlineDelay.Sub(now)).C: + s.Fail("timer should not fire when current time not updated") + case <-time.NewTimer(deadline.Sub(now)).C: + } + + s.True(s.localTimerGate.Update(updatedNewTimer)) + select { + case <-s.localTimerGate.FireChan(): + case <-time.NewTimer(deadline.Sub(now)).C: + s.Fail("timer should fire before test deadline") + } +} + +func (s *localTimerGateSuite) TestTimerFireAfterUpdate_Active_Updated() { + now := time.Now() + newTimer := now.Add(5 * time.Second) + updatedNewTimer := now.Add(1 * time.Second) + deadline := now.Add(3 * time.Second) + s.localTimerGate.Update(newTimer) + s.True(s.localTimerGate.Update(updatedNewTimer)) + + select { + case <-s.localTimerGate.FireChan(): + case <-time.NewTimer(deadline.Sub(now)).C: + s.Fail("timer should fire before test deadline") + } +} + +func (s *localTimerGateSuite) TestTimerFireAfterUpdate_Active_NotUpdated() { + now := time.Now() + newTimer := now.Add(1 * time.Second) + updatedNewTimer := now.Add(3 * time.Second) + deadline := now.Add(2 * time.Second) + s.localTimerGate.Update(newTimer) + s.False(s.localTimerGate.Update(updatedNewTimer)) + + select { + case <-s.localTimerGate.FireChan(): + case <-time.NewTimer(deadline.Sub(now)).C: s.Fail("timer should fire before test deadline") } } -func (s *localTimerGateSuite) TestTimerFireAfterUpdate_NotUpdated() { +func (s *localTimerGateSuite) TestTimerFireAfterUpdate_NotActive_Updated() { now := time.Now() - timerDelay := now.Add(1 * time.Second) - updatedTimerDelay := now.Add(3 * time.Second) - deadlineDelay := now.Add(2 * time.Second) - s.localTimerGate.Update(timerDelay) - s.False(s.localTimerGate.Update(updatedTimerDelay)) + newTimer := now.Add(-5 * time.Second) + updatedNewTimer := now.Add(1 * time.Second) + deadline := now.Add(3 * time.Second) + s.localTimerGate.Update(newTimer) + select { // this is to drain existing signal + case <-s.localTimerGate.FireChan(): + } + // test setup up complete + + s.True(s.localTimerGate.Update(updatedNewTimer)) select { case <-s.localTimerGate.FireChan(): - case <-time.NewTimer(deadlineDelay.Sub(now)).C: + case <-time.NewTimer(deadline.Sub(now)).C: s.Fail("timer should fire before test deadline") } } +func (s *localTimerGateSuite) TestTimerFireAfterUpdate_NotActive_NotUpdated() { + now := time.Now() + newTimer := now.Add(-5 * time.Second) + updatedNewTimer := now.Add(-1 * time.Second) + deadline := now.Add(1 * time.Second) + + s.localTimerGate.Update(newTimer) + select { // this is to drain existing signal + case <-s.localTimerGate.FireChan(): + } + // test setup up complete + + s.True(s.localTimerGate.Update(updatedNewTimer)) + select { + case <-s.localTimerGate.FireChan(): + case <-time.NewTimer(deadline.Sub(now)).C: + s.Fail("timer should fire before test deadline") + } +} + +func (s *localTimerGateSuite) TestTimerWillFire_Zero() { + // this test is to validate initial notification will trigger a scan of timer + s.localTimerGate.Update(time.Time{}) + s.False(s.localTimerGate.FireAfter(time.Now())) +} + func (s *localTimerGateSuite) TestTimerWillFire() { now := time.Now() - timerDelay := now.Add(2 * time.Second) - timeBeforeTimer := now.Add(1 * time.Second) - timeAfterTimer := now.Add(3 * time.Second) - s.localTimerGate.Update(timerDelay) - s.True(s.localTimerGate.FireAfter(timeBeforeTimer)) - s.False(s.localTimerGate.FireAfter(timeAfterTimer)) + newTimer := now.Add(2 * time.Second) + timeBeforeNewTimer := now.Add(1 * time.Second) + timeAfterNewTimer := now.Add(3 * time.Second) + s.localTimerGate.Update(newTimer) + s.True(s.localTimerGate.FireAfter(timeBeforeNewTimer)) + s.False(s.localTimerGate.FireAfter(timeAfterNewTimer)) } func (s *remoteTimerGateSuite) TestTimerFire() { now := s.currentTime - timerDelay := now.Add(1 * time.Second) - deadlineDelay := now.Add(2 * time.Second) - s.remoteTimerGate.Update(timerDelay) + newTimer := now.Add(1 * time.Second) + deadline := now.Add(2 * time.Second) + s.remoteTimerGate.Update(newTimer) + + select { + case <-s.remoteTimerGate.FireChan(): + s.Fail("timer should not fire when current time not updated") + case <-time.NewTimer(deadline.Sub(now)).C: + } + + s.remoteTimerGate.SetCurrentTime(deadline) + select { + case <-s.remoteTimerGate.FireChan(): + default: + s.Fail("timer should fire") + } +} + +func (s *remoteTimerGateSuite) TestTimerFireAfterUpdate_Active_Updated_BeforeNow() { + now := s.currentTime + newTimer := now.Add(5 * time.Second) + updatedNewTimer := now.Add(-1 * time.Second) + deadline := now.Add(3 * time.Second) + + s.remoteTimerGate.Update(newTimer) + select { + case <-s.remoteTimerGate.FireChan(): + s.Fail("timer should not fire when current time not updated") + case <-time.NewTimer(deadline.Sub(now)).C: + } + + s.True(s.remoteTimerGate.Update(updatedNewTimer)) + select { + case <-s.remoteTimerGate.FireChan(): + default: + s.Fail("timer should fire") + } +} + +func (s *remoteTimerGateSuite) TestTimerFireAfterUpdate_Active_Updated() { + now := s.currentTime + newTimer := now.Add(5 * time.Second) + updatedNewTimer := now.Add(1 * time.Second) + deadline := now.Add(3 * time.Second) + s.remoteTimerGate.Update(newTimer) + s.True(s.remoteTimerGate.Update(updatedNewTimer)) select { case <-s.remoteTimerGate.FireChan(): s.Fail("timer should not fire when current time not updated") - case <-time.NewTimer(deadlineDelay.Sub(now)).C: + case <-time.NewTimer(deadline.Sub(now)).C: } - s.remoteTimerGate.SetCurrentTime(deadlineDelay) + s.remoteTimerGate.SetCurrentTime(updatedNewTimer) select { case <-s.remoteTimerGate.FireChan(): default: @@ -170,21 +281,21 @@ func (s *remoteTimerGateSuite) TestTimerFire() { } } -func (s *remoteTimerGateSuite) TestTimerFireAfterUpdate_Updated() { +func (s *remoteTimerGateSuite) TestTimerFireAfterUpdate_Active_NotUpdated() { now := s.currentTime - timerDelay := now.Add(5 * time.Second) - updatedTimerDelay := now.Add(1 * time.Second) - deadlineDelay := now.Add(3 * time.Second) - s.remoteTimerGate.Update(timerDelay) - s.True(s.remoteTimerGate.Update(updatedTimerDelay)) + newTimer := now.Add(1 * time.Second) + updatedNewTimer := now.Add(3 * time.Second) + deadline := now.Add(2 * time.Second) + s.remoteTimerGate.Update(newTimer) + s.False(s.remoteTimerGate.Update(updatedNewTimer)) select { case <-s.remoteTimerGate.FireChan(): s.Fail("timer should not fire when current time not updated") - case <-time.NewTimer(deadlineDelay.Sub(now)).C: + case <-time.NewTimer(deadline.Sub(now)).C: } - s.remoteTimerGate.SetCurrentTime(updatedTimerDelay) + s.remoteTimerGate.SetCurrentTime(updatedNewTimer) select { case <-s.remoteTimerGate.FireChan(): default: @@ -192,21 +303,26 @@ func (s *remoteTimerGateSuite) TestTimerFireAfterUpdate_Updated() { } } -func (s *remoteTimerGateSuite) TestTimerFireAfterUpdate_NotUpdated() { +func (s *remoteTimerGateSuite) TestTimerFireAfterUpdate_NotActive_Updated() { now := s.currentTime - timerDelay := now.Add(1 * time.Second) - updatedTimerDelay := now.Add(3 * time.Second) - deadlineDelay := now.Add(2 * time.Second) - s.remoteTimerGate.Update(timerDelay) - s.False(s.remoteTimerGate.Update(updatedTimerDelay)) + newTimer := now.Add(-5 * time.Second) + updatedNewTimer := now.Add(1 * time.Second) + deadline := now.Add(2 * time.Second) + s.remoteTimerGate.Update(newTimer) + select { // this is to drain existing signal + case <-s.remoteTimerGate.FireChan(): + } + // test setup up complete + + s.True(s.remoteTimerGate.Update(updatedNewTimer)) select { case <-s.remoteTimerGate.FireChan(): s.Fail("timer should not fire when current time not updated") - case <-time.NewTimer(deadlineDelay.Sub(now)).C: + case <-time.NewTimer(deadline.Sub(now)).C: } - s.remoteTimerGate.SetCurrentTime(updatedTimerDelay) + s.remoteTimerGate.SetCurrentTime(updatedNewTimer) select { case <-s.remoteTimerGate.FireChan(): default: @@ -214,33 +330,114 @@ func (s *remoteTimerGateSuite) TestTimerFireAfterUpdate_NotUpdated() { } } -func (s *remoteTimerGateSuite) TestTimerFireOnlyOnce() { +func (s *remoteTimerGateSuite) TestTimerFireAfterUpdate_NotActive_NotUpdated() { now := s.currentTime - timerDelay := now.Add(1 * time.Second) - deadlineDelay := now.Add(2 * time.Second) - s.remoteTimerGate.Update(timerDelay) + newTimer := now.Add(-5 * time.Second) + updatedNewTimer := now.Add(-1 * time.Second) + + s.remoteTimerGate.Update(newTimer) + select { // this is to drain existing signal + case <-s.remoteTimerGate.FireChan(): + } + // test setup up complete + + s.True(s.remoteTimerGate.Update(updatedNewTimer)) + select { + case <-s.remoteTimerGate.FireChan(): + default: + s.Fail("timer should fire when new timer is in the past") + } +} - s.True(s.remoteTimerGate.SetCurrentTime(deadlineDelay)) +func (s *remoteTimerGateSuite) TestTimerSetCurrentTime_NoUpdate() { + now := s.currentTime + newCurrentTime := now.Add(-1 * time.Second) + s.False(s.remoteTimerGate.SetCurrentTime(newCurrentTime)) + select { + case <-s.remoteTimerGate.FireChan(): + s.Fail("timer should not fire") + default: + } +} + +func (s *remoteTimerGateSuite) TestTimerSetCurrentTime_Update_TimerAlreadyFired() { + now := s.currentTime + newTimer := now.Add(-1 * time.Second) + newCurrentTime := now.Add(1 * time.Second) + + s.remoteTimerGate.Update(newTimer) + select { // this is to drain existing signal + case <-s.remoteTimerGate.FireChan(): + } + // test setup up complete + + s.True(s.remoteTimerGate.SetCurrentTime(newCurrentTime)) + select { + case <-s.remoteTimerGate.FireChan(): + s.Fail("timer should not fire") + default: + } +} + +func (s *remoteTimerGateSuite) TestTimerSetCurrentTime_Update_TimerNotFired() { + now := s.currentTime + newTimer := now.Add(2 * time.Second) + newCurrentTime := now.Add(1 * time.Second) + + s.remoteTimerGate.Update(newTimer) + s.True(s.remoteTimerGate.SetCurrentTime(newCurrentTime)) + select { + case <-s.remoteTimerGate.FireChan(): + s.Fail("timer should not fire") + default: + } +} + +func (s *remoteTimerGateSuite) TestTimerSetCurrentTime_Update_TimerFired() { + now := s.currentTime + newTimer := now.Add(2 * time.Second) + newCurrentTime := now.Add(2 * time.Second) + + s.remoteTimerGate.Update(newTimer) + s.True(s.remoteTimerGate.SetCurrentTime(newCurrentTime)) select { case <-s.remoteTimerGate.FireChan(): default: s.Fail("timer should fire") } - s.False(s.remoteTimerGate.SetCurrentTime(deadlineDelay)) + // should fire only once + newCurrentTime = newCurrentTime.Add(1 * time.Second) + s.True(s.remoteTimerGate.SetCurrentTime(newCurrentTime)) select { case <-s.remoteTimerGate.FireChan(): - s.Fail("timer should not fire twice") + s.Fail("timer should not fire") default: } } -func (s *remoteTimerGateSuite) TestTimerWillFire() { +func (s *remoteTimerGateSuite) TestTimerWillFire_Zero() { + // this test is to validate initial notification will trigger a scan of timer + s.remoteTimerGate.Update(time.Time{}) + s.False(s.remoteTimerGate.FireAfter(time.Now())) +} + +func (s *remoteTimerGateSuite) TestTimerWillFire_Active() { + now := s.currentTime + newTimer := now.Add(2 * time.Second) + timeBeforeNewTimer := now.Add(1 * time.Second) + timeAfterNewimer := now.Add(3 * time.Second) + s.remoteTimerGate.Update(newTimer) + s.True(s.remoteTimerGate.FireAfter(timeBeforeNewTimer)) + s.False(s.remoteTimerGate.FireAfter(timeAfterNewimer)) +} + +func (s *remoteTimerGateSuite) TestTimerWillFire_NotActive() { now := s.currentTime - timerDelay := now.Add(2 * time.Second) - timeBeforeTimer := now.Add(1 * time.Second) - timeAfterTimer := now.Add(3 * time.Second) - s.remoteTimerGate.Update(timerDelay) - s.True(s.remoteTimerGate.FireAfter(timeBeforeTimer)) + newTimer := now.Add(-2 * time.Second) + timeBeforeTimer := now.Add(-3 * time.Second) + timeAfterTimer := now.Add(1 * time.Second) + s.remoteTimerGate.Update(newTimer) + s.False(s.remoteTimerGate.FireAfter(timeBeforeTimer)) s.False(s.remoteTimerGate.FireAfter(timeAfterTimer)) } diff --git a/service/history/timerQueueActiveProcessor.go b/service/history/timerQueueActiveProcessor.go index 59afacea1ba..626f78851e9 100644 --- a/service/history/timerQueueActiveProcessor.go +++ b/service/history/timerQueueActiveProcessor.go @@ -63,9 +63,6 @@ func newTimerQueueActiveProcessor(shard ShardContext, historyService *historyEng return verifyActiveTask(shard, logger, timer.DomainID, timer) } - timerGate := NewLocalTimerGate() - // this will trigger a timer gate fire event immediately - timerGate.Update(time.Time{}) timerQueueAckMgr := newTimerQueueAckMgr(metrics.TimerActiveQueueProcessorScope, shard, historyService.metricsClient, currentClusterName, logger) retryableMatchingClient := matching.NewRetryableClient(matchingClient, common.CreateMatchingRetryPolicy(), common.IsMatchingServiceTransientError) @@ -79,7 +76,7 @@ func newTimerQueueActiveProcessor(shard ShardContext, historyService *historyEng matchingClient: retryableMatchingClient, metricsClient: historyService.metricsClient, currentClusterName: currentClusterName, - timerGate: timerGate, + timerGate: NewLocalTimerGate(), timerQueueProcessorBase: newTimerQueueProcessorBase(metrics.TimerActiveQueueProcessorScope, shard, historyService, timerQueueAckMgr, timeNow, logger), timerQueueAckMgr: timerQueueAckMgr, } diff --git a/service/history/timerQueueProcessorBase.go b/service/history/timerQueueProcessorBase.go index 880afc47d06..ba504bf85ec 100644 --- a/service/history/timerQueueProcessorBase.go +++ b/service/history/timerQueueProcessorBase.go @@ -63,12 +63,15 @@ type ( timerFiredCount uint64 timerProcessor timerProcessor timerQueueAckMgr timerQueueAckMgr + rateLimiter common.TokenBucket // worker coroutines notification workerNotificationChans []chan struct{} // duplicate numOfWorker from config.TimerTaskWorkerCount for dynamic config works correctly numOfWorker int + lastPollTime time.Time + // timer notification newTimerCh chan struct{} newTimeLock sync.Mutex @@ -104,6 +107,8 @@ func newTimerQueueProcessorBase(scope int, shard ShardContext, historyService *h numOfWorker: numOfWorker, workerNotificationChans: workerNotificationChans, newTimerCh: make(chan struct{}, 1), + lastPollTime: time.Time{}, + rateLimiter: common.NewTokenBucket(shard.GetConfig().TimerProcessorMaxPollRPS(), common.NewRealTimeSource()), } return base @@ -115,6 +120,8 @@ func (t *timerQueueProcessorBase) Start() { } t.shutdownWG.Add(1) + // notify a initial scan + t.notifyNewTimer(time.Time{}) go t.processorPump() t.logger.Info("Timer queue processor started.") @@ -303,6 +310,10 @@ func (t *timerQueueProcessorBase) notifyNewTimers(timerTasks []persistence.Task) } } + t.notifyNewTimer(newTime) +} + +func (t *timerQueueProcessorBase) notifyNewTimer(newTime time.Time) { t.newTimeLock.Lock() defer t.newTimeLock.Unlock() if t.newTime.IsZero() || newTime.Before(t.newTime) { @@ -319,7 +330,6 @@ func (t *timerQueueProcessorBase) notifyNewTimers(timerTasks []persistence.Task) func (t *timerQueueProcessorBase) internalProcessor() error { timerGate := t.timerProcessor.getTimerGate() jitter := backoff.NewJitter() - lastPollTime := time.Time{} pollTimer := time.NewTimer(jitter.JitDuration( t.config.TimerProcessorMaxPollInterval(), t.config.TimerProcessorMaxPollIntervalJitterCoefficient(), @@ -327,97 +337,83 @@ func (t *timerQueueProcessorBase) internalProcessor() error { defer pollTimer.Stop() updateAckChan := time.NewTicker(t.shard.GetConfig().TimerProcessorUpdateAckInterval()).C - var nextKeyTask *persistence.TimerTaskInfo -continueProcessor: for { - now := t.now() - if nextKeyTask == nil || timerGate.FireAfter(now) { - // Wait until one of four things occurs: - // 1. we get notified of a new message - // 2. the timer gate fires (message scheduled to be delivered) - // 3. shutdown was triggered. - // 4. updating ack level - // - select { - case <-t.shutdownCh: - t.logger.Debug("Timer queue processor pump shutting down.") - return nil - case <-t.timerQueueAckMgr.getFinishedChan(): - // timer queue ack manager indicate that all task scanned - // are finished and no more tasks - // use a separate gorouting since the caller hold the shutdownWG - go t.Stop() - return nil - case <-timerGate.FireChan(): - // Timer Fired. - case <-pollTimer.C: - pollTimer.Reset(jitter.JitDuration( - t.config.TimerProcessorMaxPollInterval(), - t.config.TimerProcessorMaxPollIntervalJitterCoefficient(), - )) - if !lastPollTime.Add(t.config.TimerProcessorMaxPollInterval()).Before(time.Now()) { - continue continueProcessor - } - case <-updateAckChan: - t.timerQueueAckMgr.updateAckLevel() - continue continueProcessor - case <-t.newTimerCh: - t.newTimeLock.Lock() - newTime := t.newTime - t.newTime = emptyTime - t.newTimeLock.Unlock() - // New Timer has arrived. - t.metricsClient.IncCounter(t.scope, metrics.NewTimerNotifyCounter) - t.logger.Debugf("Woke up by the timer") - - if timerGate.Update(newTime) { - // this means timer is updated, to the new time provided - // reset the nextKeyTask as the new timer is expected to fire before previously read nextKeyTask - nextKeyTask = nil + // Wait until one of four things occurs: + // 1. we get notified of a new message + // 2. the timer gate fires (message scheduled to be delivered) + // 3. shutdown was triggered. + // 4. updating ack level + // + select { + case <-t.shutdownCh: + t.logger.Debug("Timer queue processor pump shutting down.") + return nil + case <-t.timerQueueAckMgr.getFinishedChan(): + // timer queue ack manager indicate that all task scanned + // are finished and no more tasks + // use a separate gorouting since the caller hold the shutdownWG + go t.Stop() + return nil + case <-timerGate.FireChan(): + lookAheadTimer, err := t.readAndFanoutTimerTasks() + if err != nil { + return err + } + if lookAheadTimer != nil { + timerGate.Update(lookAheadTimer.VisibilityTimestamp) + } + case <-pollTimer.C: + pollTimer.Reset(jitter.JitDuration( + t.config.TimerProcessorMaxPollInterval(), + t.config.TimerProcessorMaxPollIntervalJitterCoefficient(), + )) + if t.lastPollTime.Add(t.config.TimerProcessorMaxPollInterval()).Before(time.Now()) { + lookAheadTimer, err := t.readAndFanoutTimerTasks() + if err != nil { + return err } - - now = t.now() - t.logger.Debugf("%v: Next key after woke up by timer: %v", now.UTC(), newTime.UTC()) - if timerGate.FireAfter(now) { - continue continueProcessor + if lookAheadTimer != nil { + timerGate.Update(lookAheadTimer.VisibilityTimestamp) } } - } - - lastPollTime = time.Now() - var err error - nextKeyTask, err = t.readAndFanoutTimerTasks() - if err != nil { - return err - } - - if nextKeyTask != nil { - nextKey := TimerSequenceID{VisibilityTimestamp: nextKeyTask.VisibilityTimestamp, TaskID: nextKeyTask.TaskID} - t.logger.Debugf("%s: GetNextKey: %s", time.Now(), nextKey) - - timerGate.Update(nextKey.VisibilityTimestamp) + case <-updateAckChan: + t.timerQueueAckMgr.updateAckLevel() + case <-t.newTimerCh: + t.newTimeLock.Lock() + newTime := t.newTime + t.newTime = emptyTime + t.newTimeLock.Unlock() + // New Timer has arrived. + t.metricsClient.IncCounter(t.scope, metrics.NewTimerNotifyCounter) + timerGate.Update(newTime) } } } func (t *timerQueueProcessorBase) readAndFanoutTimerTasks() (*persistence.TimerTaskInfo, error) { - for { - // Get next set of timer tasks. - timerTasks, lookAheadTask, moreTasks, err := t.timerQueueAckMgr.readTimerTasks() - if err != nil { - return nil, err - } + if !t.rateLimiter.Consume(1, t.shard.GetConfig().TransferProcessorMaxPollInterval()) { + t.notifyNewTimer(time.Time{}) // re-enqueue the event + return nil, nil + } - for _, task := range timerTasks { - // We have a timer to fire. - t.tasksCh <- task - } + t.lastPollTime = time.Now() + timerTasks, lookAheadTask, moreTasks, err := t.timerQueueAckMgr.readTimerTasks() + if err != nil { + return nil, err + } - if !moreTasks { - return lookAheadTask, nil - } + for _, task := range timerTasks { + // We have a timer to fire. + t.tasksCh <- task } + + if !moreTasks { + return lookAheadTask, nil + } + + t.notifyNewTimer(time.Time{}) // re-enqueue the event + return nil, nil } func (t *timerQueueProcessorBase) retryTasks() { diff --git a/service/history/timerQueueStandbyProcessor.go b/service/history/timerQueueStandbyProcessor.go index 62f35550a4c..d3b27a8f197 100644 --- a/service/history/timerQueueStandbyProcessor.go +++ b/service/history/timerQueueStandbyProcessor.go @@ -59,8 +59,6 @@ func newTimerQueueStandbyProcessor(shard ShardContext, historyService *historyEn } timerGate := NewRemoteTimerGate() - // this will trigger a timer gate fire event immediately - timerGate.Update(time.Time{}) timerGate.SetCurrentTime(shard.GetCurrentTime(clusterName)) timerQueueAckMgr := newTimerQueueAckMgr(metrics.TimerStandbyQueueProcessorScope, shard, historyService.metricsClient, clusterName, logger) processor := &timerQueueStandbyProcessorImpl{ @@ -71,7 +69,7 @@ func newTimerQueueStandbyProcessor(shard ShardContext, historyService *historyEn logger: logger, metricsClient: historyService.metricsClient, clusterName: clusterName, - timerGate: NewRemoteTimerGate(), + timerGate: timerGate, timerQueueProcessorBase: newTimerQueueProcessorBase(metrics.TimerStandbyQueueProcessorScope, shard, historyService, timerQueueAckMgr, timeNow, logger), timerQueueAckMgr: timerQueueAckMgr, }