Skip to content

Commit

Permalink
Batch of fixes (#927)
Browse files Browse the repository at this point in the history
* fix noisy log
* fix remote timer gate issue, making remote timer resilient to new timer in the past
* add more unit test
* refactor existing time processing event loop to make it less confusing
  • Loading branch information
wxing1292 authored Jul 7, 2018
1 parent 77a9979 commit fd7a1db
Show file tree
Hide file tree
Showing 9 changed files with 406 additions and 174 deletions.
3 changes: 3 additions & 0 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ var keys = map[Key]string{
TimerProcessorUpdateShardTaskCount: "history.timerProcessorUpdateShardTaskCount",
TimerProcessorUpdateAckInterval: "history.timerProcessorUpdateAckInterval",
TimerProcessorCompleteTimerInterval: "history.timerProcessorCompleteTimerInterval",
TimerProcessorMaxPollRPS: "history.timerProcessorMaxPollRPS",
TimerProcessorMaxPollInterval: "history.timerProcessorMaxPollInterval",
TimerProcessorMaxPollIntervalJitterCoefficient: "history.timerProcessorMaxPollIntervalJitterCoefficient",
TimerProcessorStandbyTaskDelay: "history.timerProcessorStandbyTaskDelay",
Expand Down Expand Up @@ -190,6 +191,8 @@ const (
TimerProcessorUpdateAckInterval
// TimerProcessorCompleteTimerInterval is complete timer interval for timer processor
TimerProcessorCompleteTimerInterval
// TimerProcessorMaxPollRPS is max poll rate per second for timer processor
TimerProcessorMaxPollRPS
// TimerProcessorMaxPollInterval is max poll interval for timer processor
TimerProcessorMaxPollInterval
// TimerProcessorMaxPollIntervalJitterCoefficient is the max poll interval jitter coefficient
Expand Down
2 changes: 1 addition & 1 deletion service/history/historyReplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (r *historyReplicator) ApplyStartEvent(context *workflowExecutionContext, r
msBuilder := r.getNewMutableState(request.GetVersion(), logger)
err := r.ApplyReplicationTask(context, msBuilder, request, logger)
if err != nil {
logger.Errorf("Fail to Apply Replication task. NextEvent: %v, FirstEvent: %v, Err: %v", msBuilder.GetNextEventID(),
logger.Debugf("Fail to Apply Replication task. NextEvent: %v, FirstEvent: %v, Err: %v", msBuilder.GetNextEventID(),
request.GetFirstEventId(), err)
}
return err
Expand Down
9 changes: 5 additions & 4 deletions service/history/queueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ type (
// worker coroutines notification
workerNotificationChans []chan struct{}

lastPollTime time.Time

notifyCh chan struct{}
status int32
shutdownWG sync.WaitGroup
Expand Down Expand Up @@ -92,6 +94,7 @@ func newQueueProcessorBase(shard ShardContext, options *QueueProcessorOptions, p
metricsClient: shard.GetMetricsClient(),
logger: logger,
ackMgr: queueAckMgr,
lastPollTime: time.Time{},
}

return p
Expand Down Expand Up @@ -145,7 +148,6 @@ func (p *queueProcessorBase) processorPump() {
}

jitter := backoff.NewJitter()
lastPollTime := time.Time{}
pollTimer := time.NewTimer(jitter.JitDuration(p.options.MaxPollInterval(), p.options.MaxPollIntervalJitterCoefficient()))
updateAckTimer := time.NewTimer(p.options.UpdateAckInterval())

Expand All @@ -159,12 +161,10 @@ processorPumpLoop:
go p.Stop()
case <-p.notifyCh:
p.processBatch(tasksCh)
lastPollTime = time.Now()
case <-pollTimer.C:
pollTimer.Reset(jitter.JitDuration(p.options.MaxPollInterval(), p.options.MaxPollIntervalJitterCoefficient()))
if lastPollTime.Add(p.options.MaxPollInterval()).Before(time.Now()) {
if p.lastPollTime.Add(p.options.MaxPollInterval()).Before(time.Now()) {
p.processBatch(tasksCh)
lastPollTime = time.Now()
}
case <-updateAckTimer.C:
p.ackMgr.updateQueueAckLevel()
Expand All @@ -189,6 +189,7 @@ func (p *queueProcessorBase) processBatch(tasksCh chan<- queueTaskInfo) {
return
}

p.lastPollTime = time.Now()
tasks, more, err := p.ackMgr.readQueueTasks()

if err != nil {
Expand Down
8 changes: 5 additions & 3 deletions service/history/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,17 @@ type Config struct {
TimerProcessorCompleteTimerFailureRetryCount dynamicconfig.IntPropertyFn
TimerProcessorUpdateAckInterval dynamicconfig.DurationPropertyFn
TimerProcessorCompleteTimerInterval dynamicconfig.DurationPropertyFn
TimerProcessorMaxPollRPS dynamicconfig.IntPropertyFn
TimerProcessorMaxPollInterval dynamicconfig.DurationPropertyFn
TimerProcessorMaxPollIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
TimerProcessorStandbyTaskDelay dynamicconfig.DurationPropertyFn

// TransferQueueProcessor settings
TransferTaskBatchSize dynamicconfig.IntPropertyFn
TransferProcessorMaxPollRPS dynamicconfig.IntPropertyFn
TransferTaskWorkerCount dynamicconfig.IntPropertyFn
TransferTaskMaxRetryCount dynamicconfig.IntPropertyFn
TransferProcessorCompleteTransferFailureRetryCount dynamicconfig.IntPropertyFn
TransferProcessorMaxPollRPS dynamicconfig.IntPropertyFn
TransferProcessorMaxPollInterval dynamicconfig.DurationPropertyFn
TransferProcessorMaxPollIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
TransferProcessorUpdateAckInterval dynamicconfig.DurationPropertyFn
Expand Down Expand Up @@ -111,11 +112,12 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int) *Config {
TimerProcessorCompleteTimerFailureRetryCount: dc.GetIntProperty(dynamicconfig.TimerProcessorCompleteTimerFailureRetryCount, 10),
TimerProcessorUpdateAckInterval: dc.GetDurationProperty(dynamicconfig.TimerProcessorUpdateAckInterval, 5*time.Second),
TimerProcessorCompleteTimerInterval: dc.GetDurationProperty(dynamicconfig.TimerProcessorCompleteTimerInterval, 3*time.Second),
TimerProcessorMaxPollRPS: dc.GetIntProperty(dynamicconfig.TimerProcessorMaxPollRPS, 20),
TimerProcessorMaxPollInterval: dc.GetDurationProperty(dynamicconfig.TimerProcessorMaxPollInterval, 5*time.Minute),
TimerProcessorMaxPollIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.TimerProcessorMaxPollIntervalJitterCoefficient, 0.15),
TimerProcessorStandbyTaskDelay: dc.GetDurationProperty(dynamicconfig.TimerProcessorStandbyTaskDelay, 0*time.Minute),
TransferTaskBatchSize: dc.GetIntProperty(dynamicconfig.TransferTaskBatchSize, 100),
TransferProcessorMaxPollRPS: dc.GetIntProperty(dynamicconfig.TransferProcessorMaxPollRPS, 100),
TransferProcessorMaxPollRPS: dc.GetIntProperty(dynamicconfig.TransferProcessorMaxPollRPS, 20),
TransferTaskWorkerCount: dc.GetIntProperty(dynamicconfig.TransferTaskWorkerCount, 10),
TransferTaskMaxRetryCount: dc.GetIntProperty(dynamicconfig.TransferTaskMaxRetryCount, 100),
TransferProcessorCompleteTransferFailureRetryCount: dc.GetIntProperty(dynamicconfig.TransferProcessorCompleteTransferFailureRetryCount, 10),
Expand All @@ -127,7 +129,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int) *Config {
ReplicatorTaskBatchSize: dc.GetIntProperty(dynamicconfig.ReplicatorTaskBatchSize, 100),
ReplicatorTaskWorkerCount: dc.GetIntProperty(dynamicconfig.ReplicatorTaskWorkerCount, 10),
ReplicatorTaskMaxRetryCount: dc.GetIntProperty(dynamicconfig.ReplicatorTaskMaxRetryCount, 100),
ReplicatorProcessorMaxPollRPS: dc.GetIntProperty(dynamicconfig.ReplicatorProcessorMaxPollRPS, 100),
ReplicatorProcessorMaxPollRPS: dc.GetIntProperty(dynamicconfig.ReplicatorProcessorMaxPollRPS, 20),
ReplicatorProcessorMaxPollInterval: dc.GetDurationProperty(dynamicconfig.ReplicatorProcessorMaxPollInterval, 1*time.Minute),
ReplicatorProcessorMaxPollIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.ReplicatorProcessorMaxPollIntervalJitterCoefficient, 0.15),
ReplicatorProcessorUpdateAckInterval: dc.GetDurationProperty(dynamicconfig.ReplicatorProcessorUpdateAckInterval, 5*time.Second),
Expand Down
82 changes: 60 additions & 22 deletions service/history/timerGate.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ type (

// lock for timer and next wake up time
sync.Mutex
// view of current time
currentTime time.Time
// time which timer will fire
nextWakeupTime time.Time
// nextWakeupTime already fired or not
nextWakeupTimeFired bool
}
)

Expand Down Expand Up @@ -155,9 +155,9 @@ func (timerGate *LocalTimerGateImpl) Close() {
// NewRemoteTimerGate create a new timer gate instance
func NewRemoteTimerGate() RemoteTimerGate {
timer := &RemoteTimerGateImpl{
nextWakeupTime: time.Time{},
nextWakeupTimeFired: true, // this should be true so update API can set the next timer
fireChan: make(chan struct{}, 1),
currentTime: time.Time{},
nextWakeupTime: time.Time{},
fireChan: make(chan struct{}, 1),
}
return timer
}
Expand All @@ -171,7 +171,9 @@ func (timerGate *RemoteTimerGateImpl) FireChan() <-chan struct{} {
func (timerGate *RemoteTimerGateImpl) FireAfter(now time.Time) bool {
timerGate.Lock()
defer timerGate.Unlock()
return timerGate.nextWakeupTime.After(now)

active := timerGate.currentTime.Before(timerGate.nextWakeupTime)
return active && timerGate.nextWakeupTime.After(now)
}

// Update update the timer gate, return true if update is a success
Expand All @@ -180,33 +182,69 @@ func (timerGate *RemoteTimerGateImpl) Update(nextTime time.Time) bool {
timerGate.Lock()
defer timerGate.Unlock()

if timerGate.nextWakeupTimeFired || timerGate.nextWakeupTime.After(nextTime) {
// if timer will not fire or next wake time is after the "next"
// then we need to update the timer to fire
active := timerGate.currentTime.Before(timerGate.nextWakeupTime)
if active {
if timerGate.nextWakeupTime.Before(nextTime) {
// current time < next wake up time < next time
return false
}

if timerGate.currentTime.Before(nextTime) {
// current time < next time <= next wake up time
timerGate.nextWakeupTime = nextTime
return true
}

// next time <= current time < next wake up time
timerGate.nextWakeupTime = nextTime
timerGate.nextWakeupTimeFired = false
// Notifies caller that next notification is reset to fire at passed in 'next' visibility time
timerGate.fire()
return true
}

return false
// this means the timer, before stopped, has already fired / never active
if !timerGate.currentTime.Before(nextTime) {
// next time is <= current time, need to fire immediately
// whether to update next wake up time or not is irrelevent
timerGate.fire()
} else {
// next time > current time
timerGate.nextWakeupTime = nextTime
}
return true
}

// SetCurrentTime set the current time, and additionally fire the fire chan
// if new "current" time is after the next wake up time, return true if
// "current" is antually updated
// "current" is actually updated
func (timerGate *RemoteTimerGateImpl) SetCurrentTime(currentTime time.Time) bool {
timerGate.Lock()
defer timerGate.Unlock()
if !timerGate.nextWakeupTimeFired && !currentTime.Before(timerGate.nextWakeupTime) {
timerGate.nextWakeupTimeFired = true
select {
case timerGate.fireChan <- struct{}{}:
// timer successfully triggered
default:
// timer already triggered, pass
}

if !timerGate.currentTime.Before(currentTime) {
// new current time is <= current time
return false
}

// NOTE: do not update the current time now
if !timerGate.currentTime.Before(timerGate.nextWakeupTime) {
// current time already >= next wakeup time
// avoid duplicate fire
timerGate.currentTime = currentTime
return true
}
return false

timerGate.currentTime = currentTime
if !timerGate.currentTime.Before(timerGate.nextWakeupTime) {
timerGate.fire()
}
return true
}

func (timerGate *RemoteTimerGateImpl) fire() {
select {
case timerGate.fireChan <- struct{}{}:
// timer successfully triggered
default:
// timer already triggered, pass
}
}
Loading

0 comments on commit fd7a1db

Please sign in to comment.