Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add jitter to ack level update #999

Merged
merged 1 commit into from
Jul 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 57 additions & 48 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,54 +72,57 @@ var keys = map[Key]string{
MatchingRPS: "matching.rps",

// history settings
HistoryPersistenceMaxQPS: "history.persistenceMaxQPS",
HistoryLongPollExpirationInterval: "history.longPollExpirationInterval",
HistoryCacheInitialSize: "history.cacheInitialSize",
HistoryCacheMaxSize: "history.cacheMaxSize",
HistoryCacheTTL: "history.cacheTTL",
AcquireShardInterval: "history.acquireShardInterval",
StandbyClusterDelay: "history.standbyClusterDelay",
TimerTaskBatchSize: "history.timerTaskBatchSize",
TimerTaskWorkerCount: "history.timerTaskWorkerCount",
TimerTaskMaxRetryCount: "history.timerTaskMaxRetryCount",
TimerProcessorStartDelay: "history.timerProcessorStartDelay",
TimerProcessorFailoverStartDelay: "history.timerProcessorFailoverStartDelay",
TimerProcessorGetFailureRetryCount: "history.timerProcessorGetFailureRetryCount",
TimerProcessorCompleteTimerFailureRetryCount: "history.timerProcessorCompleteTimerFailureRetryCount",
TimerProcessorUpdateShardTaskCount: "history.timerProcessorUpdateShardTaskCount",
TimerProcessorUpdateAckInterval: "history.timerProcessorUpdateAckInterval",
TimerProcessorCompleteTimerInterval: "history.timerProcessorCompleteTimerInterval",
TimerProcessorFailoverMaxPollRPS: "history.timerProcessorFailoverMaxPollRPS",
TimerProcessorMaxPollRPS: "history.timerProcessorMaxPollRPS",
TimerProcessorMaxPollInterval: "history.timerProcessorMaxPollInterval",
TimerProcessorMaxPollIntervalJitterCoefficient: "history.timerProcessorMaxPollIntervalJitterCoefficient",
TransferTaskBatchSize: "history.transferTaskBatchSize",
TransferProcessorFailoverMaxPollRPS: "history.transferProcessorFailoverMaxPollRPS",
TransferProcessorMaxPollRPS: "history.transferProcessorMaxPollRPS",
TransferTaskWorkerCount: "history.transferTaskWorkerCount",
TransferTaskMaxRetryCount: "history.transferTaskMaxRetryCount",
TransferProcessorStartDelay: "history.transferProcessorStartDelay",
TransferProcessorFailoverStartDelay: "history.transferProcessorFailoverStartDelay",
TransferProcessorCompleteTransferFailureRetryCount: "history.transferProcessorCompleteTransferFailureRetryCount",
TransferProcessorUpdateShardTaskCount: "history.transferProcessorUpdateShardTaskCount",
TransferProcessorMaxPollInterval: "history.transferProcessorMaxPollInterval",
TransferProcessorMaxPollIntervalJitterCoefficient: "history.transferProcessorMaxPollIntervalJitterCoefficient",
TransferProcessorUpdateAckInterval: "history.transferProcessorUpdateAckInterval",
TransferProcessorCompleteTransferInterval: "history.transferProcessorCompleteTransferInterval",
ReplicatorTaskBatchSize: "history.replicatorTaskBatchSize",
ReplicatorTaskWorkerCount: "history.replicatorTaskWorkerCount",
ReplicatorTaskMaxRetryCount: "history.replicatorTaskMaxRetryCount",
ReplicatorProcessorStartDelay: "history.replicatorProcessorStartDelay",
ReplicatorProcessorMaxPollRPS: "history.replicatorProcessorMaxPollRPS",
ReplicatorProcessorUpdateShardTaskCount: "history.replicatorProcessorUpdateShardTaskCount",
ReplicatorProcessorMaxPollInterval: "history.replicatorProcessorMaxPollInterval",
ReplicatorProcessorMaxPollIntervalJitterCoefficient: "history.replicatorProcessorMaxPollIntervalJitterCoefficient",
ReplicatorProcessorUpdateAckInterval: "history.replicatorProcessorUpdateAckInterval",
ExecutionMgrNumConns: "history.executionMgrNumConns",
HistoryMgrNumConns: "history.historyMgrNumConns",
MaximumBufferedEventsBatch: "history.maximumBufferedEventsBatch",
ShardUpdateMinInterval: "history.shardUpdateMinInterval",
ShardSyncMinInterval: "history.shardSyncMinInterval",
HistoryPersistenceMaxQPS: "history.persistenceMaxQPS",
HistoryLongPollExpirationInterval: "history.longPollExpirationInterval",
HistoryCacheInitialSize: "history.cacheInitialSize",
HistoryCacheMaxSize: "history.cacheMaxSize",
HistoryCacheTTL: "history.cacheTTL",
AcquireShardInterval: "history.acquireShardInterval",
StandbyClusterDelay: "history.standbyClusterDelay",
TimerTaskBatchSize: "history.timerTaskBatchSize",
TimerTaskWorkerCount: "history.timerTaskWorkerCount",
TimerTaskMaxRetryCount: "history.timerTaskMaxRetryCount",
TimerProcessorStartDelay: "history.timerProcessorStartDelay",
TimerProcessorFailoverStartDelay: "history.timerProcessorFailoverStartDelay",
TimerProcessorGetFailureRetryCount: "history.timerProcessorGetFailureRetryCount",
TimerProcessorCompleteTimerFailureRetryCount: "history.timerProcessorCompleteTimerFailureRetryCount",
TimerProcessorUpdateShardTaskCount: "history.timerProcessorUpdateShardTaskCount",
TimerProcessorUpdateAckInterval: "history.timerProcessorUpdateAckInterval",
TimerProcessorUpdateAckIntervalJitterCoefficient: "history.timerProcessorUpdateAckIntervalJitterCoefficient",
TimerProcessorCompleteTimerInterval: "history.timerProcessorCompleteTimerInterval",
TimerProcessorFailoverMaxPollRPS: "history.timerProcessorFailoverMaxPollRPS",
TimerProcessorMaxPollRPS: "history.timerProcessorMaxPollRPS",
TimerProcessorMaxPollInterval: "history.timerProcessorMaxPollInterval",
TimerProcessorMaxPollIntervalJitterCoefficient: "history.timerProcessorMaxPollIntervalJitterCoefficient",
TransferTaskBatchSize: "history.transferTaskBatchSize",
TransferProcessorFailoverMaxPollRPS: "history.transferProcessorFailoverMaxPollRPS",
TransferProcessorMaxPollRPS: "history.transferProcessorMaxPollRPS",
TransferTaskWorkerCount: "history.transferTaskWorkerCount",
TransferTaskMaxRetryCount: "history.transferTaskMaxRetryCount",
TransferProcessorStartDelay: "history.transferProcessorStartDelay",
TransferProcessorFailoverStartDelay: "history.transferProcessorFailoverStartDelay",
TransferProcessorCompleteTransferFailureRetryCount: "history.transferProcessorCompleteTransferFailureRetryCount",
TransferProcessorUpdateShardTaskCount: "history.transferProcessorUpdateShardTaskCount",
TransferProcessorMaxPollInterval: "history.transferProcessorMaxPollInterval",
TransferProcessorMaxPollIntervalJitterCoefficient: "history.transferProcessorMaxPollIntervalJitterCoefficient",
TransferProcessorUpdateAckInterval: "history.transferProcessorUpdateAckInterval",
TransferProcessorUpdateAckIntervalJitterCoefficient: "history.transferProcessorUpdateAckIntervalJitterCoefficient",
TransferProcessorCompleteTransferInterval: "history.transferProcessorCompleteTransferInterval",
ReplicatorTaskBatchSize: "history.replicatorTaskBatchSize",
ReplicatorTaskWorkerCount: "history.replicatorTaskWorkerCount",
ReplicatorTaskMaxRetryCount: "history.replicatorTaskMaxRetryCount",
ReplicatorProcessorStartDelay: "history.replicatorProcessorStartDelay",
ReplicatorProcessorMaxPollRPS: "history.replicatorProcessorMaxPollRPS",
ReplicatorProcessorUpdateShardTaskCount: "history.replicatorProcessorUpdateShardTaskCount",
ReplicatorProcessorMaxPollInterval: "history.replicatorProcessorMaxPollInterval",
ReplicatorProcessorMaxPollIntervalJitterCoefficient: "history.replicatorProcessorMaxPollIntervalJitterCoefficient",
ReplicatorProcessorUpdateAckInterval: "history.replicatorProcessorUpdateAckInterval",
ReplicatorProcessorUpdateAckIntervalJitterCoefficient: "history.replicatorProcessorUpdateAckIntervalJitterCoefficient",
ExecutionMgrNumConns: "history.executionMgrNumConns",
HistoryMgrNumConns: "history.historyMgrNumConns",
MaximumBufferedEventsBatch: "history.maximumBufferedEventsBatch",
ShardUpdateMinInterval: "history.shardUpdateMinInterval",
ShardSyncMinInterval: "history.shardSyncMinInterval",

// worker settings
WorkerPersistenceMaxQPS: "worker.persistenceMaxQPS",
Expand Down Expand Up @@ -217,6 +220,8 @@ const (
TimerProcessorUpdateShardTaskCount
// TimerProcessorUpdateAckInterval is update interval for timer processor
TimerProcessorUpdateAckInterval
// TimerProcessorUpdateAckIntervalJitterCoefficient is the update interval jitter coefficient
TimerProcessorUpdateAckIntervalJitterCoefficient
// TimerProcessorCompleteTimerInterval is complete timer interval for timer processor
TimerProcessorCompleteTimerInterval
// TimerProcessorFailoverMaxPollRPS is max poll rate per second for timer processor
Expand Down Expand Up @@ -251,6 +256,8 @@ const (
TransferProcessorMaxPollIntervalJitterCoefficient
// TransferProcessorUpdateAckInterval is update interval for transferQueueProcessor
TransferProcessorUpdateAckInterval
// TransferProcessorUpdateAckIntervalJitterCoefficient is the update interval jitter coefficient
TransferProcessorUpdateAckIntervalJitterCoefficient
// TransferProcessorCompleteTransferInterval is complete timer interval for transferQueueProcessor
TransferProcessorCompleteTransferInterval
// ReplicatorTaskBatchSize is batch size for ReplicatorProcessor
Expand All @@ -271,6 +278,8 @@ const (
ReplicatorProcessorMaxPollIntervalJitterCoefficient
// ReplicatorProcessorUpdateAckInterval is update interval for ReplicatorProcessor
ReplicatorProcessorUpdateAckInterval
// ReplicatorProcessorUpdateAckIntervalJitterCoefficient is the update interval jitter coefficient
ReplicatorProcessorUpdateAckIntervalJitterCoefficient
// ExecutionMgrNumConns is persistence connections number for ExecutionManager
ExecutionMgrNumConns
// HistoryMgrNumConns is persistence connections number for HistoryManager
Expand Down
42 changes: 28 additions & 14 deletions service/history/queueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,16 @@ import (
type (
// QueueProcessorOptions is options passed to queue processor implementation
QueueProcessorOptions struct {
StartDelay dynamicconfig.DurationPropertyFn
BatchSize dynamicconfig.IntPropertyFn
WorkerCount dynamicconfig.IntPropertyFn
MaxPollRPS dynamicconfig.IntPropertyFn
MaxPollInterval dynamicconfig.DurationPropertyFn
MaxPollIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
UpdateAckInterval dynamicconfig.DurationPropertyFn
MaxRetryCount dynamicconfig.IntPropertyFn
MetricScope int
StartDelay dynamicconfig.DurationPropertyFn
BatchSize dynamicconfig.IntPropertyFn
WorkerCount dynamicconfig.IntPropertyFn
MaxPollRPS dynamicconfig.IntPropertyFn
MaxPollInterval dynamicconfig.DurationPropertyFn
MaxPollIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
UpdateAckInterval dynamicconfig.DurationPropertyFn
UpdateAckIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
MaxRetryCount dynamicconfig.IntPropertyFn
MetricScope int
}

queueProcessorBase struct {
Expand Down Expand Up @@ -155,11 +156,17 @@ func (p *queueProcessorBase) processorPump() {
}

jitter := backoff.NewJitter()
pollTimer := time.NewTimer(jitter.JitDuration(p.options.MaxPollInterval(), p.options.MaxPollIntervalJitterCoefficient()))
pollTimer := time.NewTimer(jitter.JitDuration(
p.options.MaxPollInterval(),
p.options.MaxPollIntervalJitterCoefficient(),
))
defer pollTimer.Stop()

updateAckTicker := time.NewTicker(p.options.UpdateAckInterval())
defer updateAckTicker.Stop()
updateAckTimer := time.NewTimer(jitter.JitDuration(
p.options.UpdateAckInterval(),
p.options.UpdateAckIntervalJitterCoefficient(),
))
defer updateAckTimer.Stop()

processorPumpLoop:
for {
Expand All @@ -172,11 +179,18 @@ processorPumpLoop:
case <-p.notifyCh:
p.processBatch(tasksCh)
case <-pollTimer.C:
pollTimer.Reset(jitter.JitDuration(p.options.MaxPollInterval(), p.options.MaxPollIntervalJitterCoefficient()))
pollTimer.Reset(jitter.JitDuration(
p.options.MaxPollInterval(),
p.options.MaxPollIntervalJitterCoefficient(),
))
if p.lastPollTime.Add(p.options.MaxPollInterval()).Before(time.Now()) {
p.processBatch(tasksCh)
}
case <-updateAckTicker.C:
case <-updateAckTimer.C:
updateAckTimer.Reset(jitter.JitDuration(
p.options.UpdateAckInterval(),
p.options.UpdateAckIntervalJitterCoefficient(),
))
p.ackMgr.updateQueueAckLevel()
}
}
Expand Down
19 changes: 10 additions & 9 deletions service/history/replicatorQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,16 @@ func newReplicatorQueueProcessor(shard ShardContext, replicator messaging.Produc

config := shard.GetConfig()
options := &QueueProcessorOptions{
StartDelay: config.ReplicatorProcessorStartDelay,
BatchSize: config.ReplicatorTaskBatchSize,
WorkerCount: config.ReplicatorTaskWorkerCount,
MaxPollRPS: config.ReplicatorProcessorMaxPollRPS,
MaxPollInterval: config.ReplicatorProcessorMaxPollInterval,
MaxPollIntervalJitterCoefficient: config.ReplicatorProcessorMaxPollIntervalJitterCoefficient,
UpdateAckInterval: config.ReplicatorProcessorUpdateAckInterval,
MaxRetryCount: config.ReplicatorTaskMaxRetryCount,
MetricScope: metrics.ReplicatorQueueProcessorScope,
StartDelay: config.ReplicatorProcessorStartDelay,
BatchSize: config.ReplicatorTaskBatchSize,
WorkerCount: config.ReplicatorTaskWorkerCount,
MaxPollRPS: config.ReplicatorProcessorMaxPollRPS,
MaxPollInterval: config.ReplicatorProcessorMaxPollInterval,
MaxPollIntervalJitterCoefficient: config.ReplicatorProcessorMaxPollIntervalJitterCoefficient,
UpdateAckInterval: config.ReplicatorProcessorUpdateAckInterval,
UpdateAckIntervalJitterCoefficient: config.ReplicatorProcessorUpdateAckIntervalJitterCoefficient,
MaxRetryCount: config.ReplicatorTaskMaxRetryCount,
MetricScope: metrics.ReplicatorQueueProcessorScope,
}

logger = logger.WithFields(bark.Fields{
Expand Down
Loading