Skip to content

Commit

Permalink
Add jitter to ack level update (#999)
Browse files Browse the repository at this point in the history
  • Loading branch information
wxing1292 authored and samarabbas committed Jul 26, 2018
1 parent 2912570 commit 9905bf7
Show file tree
Hide file tree
Showing 7 changed files with 220 additions and 180 deletions.
105 changes: 57 additions & 48 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,54 +72,57 @@ var keys = map[Key]string{
MatchingRPS: "matching.rps",

// history settings
HistoryPersistenceMaxQPS: "history.persistenceMaxQPS",
HistoryLongPollExpirationInterval: "history.longPollExpirationInterval",
HistoryCacheInitialSize: "history.cacheInitialSize",
HistoryCacheMaxSize: "history.cacheMaxSize",
HistoryCacheTTL: "history.cacheTTL",
AcquireShardInterval: "history.acquireShardInterval",
StandbyClusterDelay: "history.standbyClusterDelay",
TimerTaskBatchSize: "history.timerTaskBatchSize",
TimerTaskWorkerCount: "history.timerTaskWorkerCount",
TimerTaskMaxRetryCount: "history.timerTaskMaxRetryCount",
TimerProcessorStartDelay: "history.timerProcessorStartDelay",
TimerProcessorFailoverStartDelay: "history.timerProcessorFailoverStartDelay",
TimerProcessorGetFailureRetryCount: "history.timerProcessorGetFailureRetryCount",
TimerProcessorCompleteTimerFailureRetryCount: "history.timerProcessorCompleteTimerFailureRetryCount",
TimerProcessorUpdateShardTaskCount: "history.timerProcessorUpdateShardTaskCount",
TimerProcessorUpdateAckInterval: "history.timerProcessorUpdateAckInterval",
TimerProcessorCompleteTimerInterval: "history.timerProcessorCompleteTimerInterval",
TimerProcessorFailoverMaxPollRPS: "history.timerProcessorFailoverMaxPollRPS",
TimerProcessorMaxPollRPS: "history.timerProcessorMaxPollRPS",
TimerProcessorMaxPollInterval: "history.timerProcessorMaxPollInterval",
TimerProcessorMaxPollIntervalJitterCoefficient: "history.timerProcessorMaxPollIntervalJitterCoefficient",
TransferTaskBatchSize: "history.transferTaskBatchSize",
TransferProcessorFailoverMaxPollRPS: "history.transferProcessorFailoverMaxPollRPS",
TransferProcessorMaxPollRPS: "history.transferProcessorMaxPollRPS",
TransferTaskWorkerCount: "history.transferTaskWorkerCount",
TransferTaskMaxRetryCount: "history.transferTaskMaxRetryCount",
TransferProcessorStartDelay: "history.transferProcessorStartDelay",
TransferProcessorFailoverStartDelay: "history.transferProcessorFailoverStartDelay",
TransferProcessorCompleteTransferFailureRetryCount: "history.transferProcessorCompleteTransferFailureRetryCount",
TransferProcessorUpdateShardTaskCount: "history.transferProcessorUpdateShardTaskCount",
TransferProcessorMaxPollInterval: "history.transferProcessorMaxPollInterval",
TransferProcessorMaxPollIntervalJitterCoefficient: "history.transferProcessorMaxPollIntervalJitterCoefficient",
TransferProcessorUpdateAckInterval: "history.transferProcessorUpdateAckInterval",
TransferProcessorCompleteTransferInterval: "history.transferProcessorCompleteTransferInterval",
ReplicatorTaskBatchSize: "history.replicatorTaskBatchSize",
ReplicatorTaskWorkerCount: "history.replicatorTaskWorkerCount",
ReplicatorTaskMaxRetryCount: "history.replicatorTaskMaxRetryCount",
ReplicatorProcessorStartDelay: "history.replicatorProcessorStartDelay",
ReplicatorProcessorMaxPollRPS: "history.replicatorProcessorMaxPollRPS",
ReplicatorProcessorUpdateShardTaskCount: "history.replicatorProcessorUpdateShardTaskCount",
ReplicatorProcessorMaxPollInterval: "history.replicatorProcessorMaxPollInterval",
ReplicatorProcessorMaxPollIntervalJitterCoefficient: "history.replicatorProcessorMaxPollIntervalJitterCoefficient",
ReplicatorProcessorUpdateAckInterval: "history.replicatorProcessorUpdateAckInterval",
ExecutionMgrNumConns: "history.executionMgrNumConns",
HistoryMgrNumConns: "history.historyMgrNumConns",
MaximumBufferedEventsBatch: "history.maximumBufferedEventsBatch",
ShardUpdateMinInterval: "history.shardUpdateMinInterval",
ShardSyncMinInterval: "history.shardSyncMinInterval",
HistoryPersistenceMaxQPS: "history.persistenceMaxQPS",
HistoryLongPollExpirationInterval: "history.longPollExpirationInterval",
HistoryCacheInitialSize: "history.cacheInitialSize",
HistoryCacheMaxSize: "history.cacheMaxSize",
HistoryCacheTTL: "history.cacheTTL",
AcquireShardInterval: "history.acquireShardInterval",
StandbyClusterDelay: "history.standbyClusterDelay",
TimerTaskBatchSize: "history.timerTaskBatchSize",
TimerTaskWorkerCount: "history.timerTaskWorkerCount",
TimerTaskMaxRetryCount: "history.timerTaskMaxRetryCount",
TimerProcessorStartDelay: "history.timerProcessorStartDelay",
TimerProcessorFailoverStartDelay: "history.timerProcessorFailoverStartDelay",
TimerProcessorGetFailureRetryCount: "history.timerProcessorGetFailureRetryCount",
TimerProcessorCompleteTimerFailureRetryCount: "history.timerProcessorCompleteTimerFailureRetryCount",
TimerProcessorUpdateShardTaskCount: "history.timerProcessorUpdateShardTaskCount",
TimerProcessorUpdateAckInterval: "history.timerProcessorUpdateAckInterval",
TimerProcessorUpdateAckIntervalJitterCoefficient: "history.timerProcessorUpdateAckIntervalJitterCoefficient",
TimerProcessorCompleteTimerInterval: "history.timerProcessorCompleteTimerInterval",
TimerProcessorFailoverMaxPollRPS: "history.timerProcessorFailoverMaxPollRPS",
TimerProcessorMaxPollRPS: "history.timerProcessorMaxPollRPS",
TimerProcessorMaxPollInterval: "history.timerProcessorMaxPollInterval",
TimerProcessorMaxPollIntervalJitterCoefficient: "history.timerProcessorMaxPollIntervalJitterCoefficient",
TransferTaskBatchSize: "history.transferTaskBatchSize",
TransferProcessorFailoverMaxPollRPS: "history.transferProcessorFailoverMaxPollRPS",
TransferProcessorMaxPollRPS: "history.transferProcessorMaxPollRPS",
TransferTaskWorkerCount: "history.transferTaskWorkerCount",
TransferTaskMaxRetryCount: "history.transferTaskMaxRetryCount",
TransferProcessorStartDelay: "history.transferProcessorStartDelay",
TransferProcessorFailoverStartDelay: "history.transferProcessorFailoverStartDelay",
TransferProcessorCompleteTransferFailureRetryCount: "history.transferProcessorCompleteTransferFailureRetryCount",
TransferProcessorUpdateShardTaskCount: "history.transferProcessorUpdateShardTaskCount",
TransferProcessorMaxPollInterval: "history.transferProcessorMaxPollInterval",
TransferProcessorMaxPollIntervalJitterCoefficient: "history.transferProcessorMaxPollIntervalJitterCoefficient",
TransferProcessorUpdateAckInterval: "history.transferProcessorUpdateAckInterval",
TransferProcessorUpdateAckIntervalJitterCoefficient: "history.transferProcessorUpdateAckIntervalJitterCoefficient",
TransferProcessorCompleteTransferInterval: "history.transferProcessorCompleteTransferInterval",
ReplicatorTaskBatchSize: "history.replicatorTaskBatchSize",
ReplicatorTaskWorkerCount: "history.replicatorTaskWorkerCount",
ReplicatorTaskMaxRetryCount: "history.replicatorTaskMaxRetryCount",
ReplicatorProcessorStartDelay: "history.replicatorProcessorStartDelay",
ReplicatorProcessorMaxPollRPS: "history.replicatorProcessorMaxPollRPS",
ReplicatorProcessorUpdateShardTaskCount: "history.replicatorProcessorUpdateShardTaskCount",
ReplicatorProcessorMaxPollInterval: "history.replicatorProcessorMaxPollInterval",
ReplicatorProcessorMaxPollIntervalJitterCoefficient: "history.replicatorProcessorMaxPollIntervalJitterCoefficient",
ReplicatorProcessorUpdateAckInterval: "history.replicatorProcessorUpdateAckInterval",
ReplicatorProcessorUpdateAckIntervalJitterCoefficient: "history.replicatorProcessorUpdateAckIntervalJitterCoefficient",
ExecutionMgrNumConns: "history.executionMgrNumConns",
HistoryMgrNumConns: "history.historyMgrNumConns",
MaximumBufferedEventsBatch: "history.maximumBufferedEventsBatch",
ShardUpdateMinInterval: "history.shardUpdateMinInterval",
ShardSyncMinInterval: "history.shardSyncMinInterval",

// worker settings
WorkerPersistenceMaxQPS: "worker.persistenceMaxQPS",
Expand Down Expand Up @@ -217,6 +220,8 @@ const (
TimerProcessorUpdateShardTaskCount
// TimerProcessorUpdateAckInterval is update interval for timer processor
TimerProcessorUpdateAckInterval
// TimerProcessorUpdateAckIntervalJitterCoefficient is the update interval jitter coefficient
TimerProcessorUpdateAckIntervalJitterCoefficient
// TimerProcessorCompleteTimerInterval is complete timer interval for timer processor
TimerProcessorCompleteTimerInterval
// TimerProcessorFailoverMaxPollRPS is max poll rate per second for timer processor
Expand Down Expand Up @@ -251,6 +256,8 @@ const (
TransferProcessorMaxPollIntervalJitterCoefficient
// TransferProcessorUpdateAckInterval is update interval for transferQueueProcessor
TransferProcessorUpdateAckInterval
// TransferProcessorUpdateAckIntervalJitterCoefficient is the update interval jitter coefficient
TransferProcessorUpdateAckIntervalJitterCoefficient
// TransferProcessorCompleteTransferInterval is complete timer interval for transferQueueProcessor
TransferProcessorCompleteTransferInterval
// ReplicatorTaskBatchSize is batch size for ReplicatorProcessor
Expand All @@ -271,6 +278,8 @@ const (
ReplicatorProcessorMaxPollIntervalJitterCoefficient
// ReplicatorProcessorUpdateAckInterval is update interval for ReplicatorProcessor
ReplicatorProcessorUpdateAckInterval
// ReplicatorProcessorUpdateAckIntervalJitterCoefficient is the update interval jitter coefficient
ReplicatorProcessorUpdateAckIntervalJitterCoefficient
// ExecutionMgrNumConns is persistence connections number for ExecutionManager
ExecutionMgrNumConns
// HistoryMgrNumConns is persistence connections number for HistoryManager
Expand Down
42 changes: 28 additions & 14 deletions service/history/queueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,16 @@ import (
type (
// QueueProcessorOptions is options passed to queue processor implementation
QueueProcessorOptions struct {
StartDelay dynamicconfig.DurationPropertyFn
BatchSize dynamicconfig.IntPropertyFn
WorkerCount dynamicconfig.IntPropertyFn
MaxPollRPS dynamicconfig.IntPropertyFn
MaxPollInterval dynamicconfig.DurationPropertyFn
MaxPollIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
UpdateAckInterval dynamicconfig.DurationPropertyFn
MaxRetryCount dynamicconfig.IntPropertyFn
MetricScope int
StartDelay dynamicconfig.DurationPropertyFn
BatchSize dynamicconfig.IntPropertyFn
WorkerCount dynamicconfig.IntPropertyFn
MaxPollRPS dynamicconfig.IntPropertyFn
MaxPollInterval dynamicconfig.DurationPropertyFn
MaxPollIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
UpdateAckInterval dynamicconfig.DurationPropertyFn
UpdateAckIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
MaxRetryCount dynamicconfig.IntPropertyFn
MetricScope int
}

queueProcessorBase struct {
Expand Down Expand Up @@ -155,11 +156,17 @@ func (p *queueProcessorBase) processorPump() {
}

jitter := backoff.NewJitter()
pollTimer := time.NewTimer(jitter.JitDuration(p.options.MaxPollInterval(), p.options.MaxPollIntervalJitterCoefficient()))
pollTimer := time.NewTimer(jitter.JitDuration(
p.options.MaxPollInterval(),
p.options.MaxPollIntervalJitterCoefficient(),
))
defer pollTimer.Stop()

updateAckTicker := time.NewTicker(p.options.UpdateAckInterval())
defer updateAckTicker.Stop()
updateAckTimer := time.NewTimer(jitter.JitDuration(
p.options.UpdateAckInterval(),
p.options.UpdateAckIntervalJitterCoefficient(),
))
defer updateAckTimer.Stop()

processorPumpLoop:
for {
Expand All @@ -172,11 +179,18 @@ processorPumpLoop:
case <-p.notifyCh:
p.processBatch(tasksCh)
case <-pollTimer.C:
pollTimer.Reset(jitter.JitDuration(p.options.MaxPollInterval(), p.options.MaxPollIntervalJitterCoefficient()))
pollTimer.Reset(jitter.JitDuration(
p.options.MaxPollInterval(),
p.options.MaxPollIntervalJitterCoefficient(),
))
if p.lastPollTime.Add(p.options.MaxPollInterval()).Before(time.Now()) {
p.processBatch(tasksCh)
}
case <-updateAckTicker.C:
case <-updateAckTimer.C:
updateAckTimer.Reset(jitter.JitDuration(
p.options.UpdateAckInterval(),
p.options.UpdateAckIntervalJitterCoefficient(),
))
p.ackMgr.updateQueueAckLevel()
}
}
Expand Down
19 changes: 10 additions & 9 deletions service/history/replicatorQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,16 @@ func newReplicatorQueueProcessor(shard ShardContext, replicator messaging.Produc

config := shard.GetConfig()
options := &QueueProcessorOptions{
StartDelay: config.ReplicatorProcessorStartDelay,
BatchSize: config.ReplicatorTaskBatchSize,
WorkerCount: config.ReplicatorTaskWorkerCount,
MaxPollRPS: config.ReplicatorProcessorMaxPollRPS,
MaxPollInterval: config.ReplicatorProcessorMaxPollInterval,
MaxPollIntervalJitterCoefficient: config.ReplicatorProcessorMaxPollIntervalJitterCoefficient,
UpdateAckInterval: config.ReplicatorProcessorUpdateAckInterval,
MaxRetryCount: config.ReplicatorTaskMaxRetryCount,
MetricScope: metrics.ReplicatorQueueProcessorScope,
StartDelay: config.ReplicatorProcessorStartDelay,
BatchSize: config.ReplicatorTaskBatchSize,
WorkerCount: config.ReplicatorTaskWorkerCount,
MaxPollRPS: config.ReplicatorProcessorMaxPollRPS,
MaxPollInterval: config.ReplicatorProcessorMaxPollInterval,
MaxPollIntervalJitterCoefficient: config.ReplicatorProcessorMaxPollIntervalJitterCoefficient,
UpdateAckInterval: config.ReplicatorProcessorUpdateAckInterval,
UpdateAckIntervalJitterCoefficient: config.ReplicatorProcessorUpdateAckIntervalJitterCoefficient,
MaxRetryCount: config.ReplicatorTaskMaxRetryCount,
MetricScope: metrics.ReplicatorQueueProcessorScope,
}

logger = logger.WithFields(bark.Fields{
Expand Down
Loading

0 comments on commit 9905bf7

Please sign in to comment.