diff --git a/common/service/dynamicconfig/constants.go b/common/service/dynamicconfig/constants.go index 9283b681177..9061202b503 100644 --- a/common/service/dynamicconfig/constants.go +++ b/common/service/dynamicconfig/constants.go @@ -72,54 +72,57 @@ var keys = map[Key]string{ MatchingRPS: "matching.rps", // history settings - HistoryPersistenceMaxQPS: "history.persistenceMaxQPS", - HistoryLongPollExpirationInterval: "history.longPollExpirationInterval", - HistoryCacheInitialSize: "history.cacheInitialSize", - HistoryCacheMaxSize: "history.cacheMaxSize", - HistoryCacheTTL: "history.cacheTTL", - AcquireShardInterval: "history.acquireShardInterval", - StandbyClusterDelay: "history.standbyClusterDelay", - TimerTaskBatchSize: "history.timerTaskBatchSize", - TimerTaskWorkerCount: "history.timerTaskWorkerCount", - TimerTaskMaxRetryCount: "history.timerTaskMaxRetryCount", - TimerProcessorStartDelay: "history.timerProcessorStartDelay", - TimerProcessorFailoverStartDelay: "history.timerProcessorFailoverStartDelay", - TimerProcessorGetFailureRetryCount: "history.timerProcessorGetFailureRetryCount", - TimerProcessorCompleteTimerFailureRetryCount: "history.timerProcessorCompleteTimerFailureRetryCount", - TimerProcessorUpdateShardTaskCount: "history.timerProcessorUpdateShardTaskCount", - TimerProcessorUpdateAckInterval: "history.timerProcessorUpdateAckInterval", - TimerProcessorCompleteTimerInterval: "history.timerProcessorCompleteTimerInterval", - TimerProcessorFailoverMaxPollRPS: "history.timerProcessorFailoverMaxPollRPS", - TimerProcessorMaxPollRPS: "history.timerProcessorMaxPollRPS", - TimerProcessorMaxPollInterval: "history.timerProcessorMaxPollInterval", - TimerProcessorMaxPollIntervalJitterCoefficient: "history.timerProcessorMaxPollIntervalJitterCoefficient", - TransferTaskBatchSize: "history.transferTaskBatchSize", - TransferProcessorFailoverMaxPollRPS: "history.transferProcessorFailoverMaxPollRPS", - TransferProcessorMaxPollRPS: "history.transferProcessorMaxPollRPS", - TransferTaskWorkerCount: "history.transferTaskWorkerCount", - TransferTaskMaxRetryCount: "history.transferTaskMaxRetryCount", - TransferProcessorStartDelay: "history.transferProcessorStartDelay", - TransferProcessorFailoverStartDelay: "history.transferProcessorFailoverStartDelay", - TransferProcessorCompleteTransferFailureRetryCount: "history.transferProcessorCompleteTransferFailureRetryCount", - TransferProcessorUpdateShardTaskCount: "history.transferProcessorUpdateShardTaskCount", - TransferProcessorMaxPollInterval: "history.transferProcessorMaxPollInterval", - TransferProcessorMaxPollIntervalJitterCoefficient: "history.transferProcessorMaxPollIntervalJitterCoefficient", - TransferProcessorUpdateAckInterval: "history.transferProcessorUpdateAckInterval", - TransferProcessorCompleteTransferInterval: "history.transferProcessorCompleteTransferInterval", - ReplicatorTaskBatchSize: "history.replicatorTaskBatchSize", - ReplicatorTaskWorkerCount: "history.replicatorTaskWorkerCount", - ReplicatorTaskMaxRetryCount: "history.replicatorTaskMaxRetryCount", - ReplicatorProcessorStartDelay: "history.replicatorProcessorStartDelay", - ReplicatorProcessorMaxPollRPS: "history.replicatorProcessorMaxPollRPS", - ReplicatorProcessorUpdateShardTaskCount: "history.replicatorProcessorUpdateShardTaskCount", - ReplicatorProcessorMaxPollInterval: "history.replicatorProcessorMaxPollInterval", - ReplicatorProcessorMaxPollIntervalJitterCoefficient: "history.replicatorProcessorMaxPollIntervalJitterCoefficient", - ReplicatorProcessorUpdateAckInterval: "history.replicatorProcessorUpdateAckInterval", - ExecutionMgrNumConns: "history.executionMgrNumConns", - HistoryMgrNumConns: "history.historyMgrNumConns", - MaximumBufferedEventsBatch: "history.maximumBufferedEventsBatch", - ShardUpdateMinInterval: "history.shardUpdateMinInterval", - ShardSyncMinInterval: "history.shardSyncMinInterval", + HistoryPersistenceMaxQPS: "history.persistenceMaxQPS", + HistoryLongPollExpirationInterval: "history.longPollExpirationInterval", + HistoryCacheInitialSize: "history.cacheInitialSize", + HistoryCacheMaxSize: "history.cacheMaxSize", + HistoryCacheTTL: "history.cacheTTL", + AcquireShardInterval: "history.acquireShardInterval", + StandbyClusterDelay: "history.standbyClusterDelay", + TimerTaskBatchSize: "history.timerTaskBatchSize", + TimerTaskWorkerCount: "history.timerTaskWorkerCount", + TimerTaskMaxRetryCount: "history.timerTaskMaxRetryCount", + TimerProcessorStartDelay: "history.timerProcessorStartDelay", + TimerProcessorFailoverStartDelay: "history.timerProcessorFailoverStartDelay", + TimerProcessorGetFailureRetryCount: "history.timerProcessorGetFailureRetryCount", + TimerProcessorCompleteTimerFailureRetryCount: "history.timerProcessorCompleteTimerFailureRetryCount", + TimerProcessorUpdateShardTaskCount: "history.timerProcessorUpdateShardTaskCount", + TimerProcessorUpdateAckInterval: "history.timerProcessorUpdateAckInterval", + TimerProcessorUpdateAckIntervalJitterCoefficient: "history.timerProcessorUpdateAckIntervalJitterCoefficient", + TimerProcessorCompleteTimerInterval: "history.timerProcessorCompleteTimerInterval", + TimerProcessorFailoverMaxPollRPS: "history.timerProcessorFailoverMaxPollRPS", + TimerProcessorMaxPollRPS: "history.timerProcessorMaxPollRPS", + TimerProcessorMaxPollInterval: "history.timerProcessorMaxPollInterval", + TimerProcessorMaxPollIntervalJitterCoefficient: "history.timerProcessorMaxPollIntervalJitterCoefficient", + TransferTaskBatchSize: "history.transferTaskBatchSize", + TransferProcessorFailoverMaxPollRPS: "history.transferProcessorFailoverMaxPollRPS", + TransferProcessorMaxPollRPS: "history.transferProcessorMaxPollRPS", + TransferTaskWorkerCount: "history.transferTaskWorkerCount", + TransferTaskMaxRetryCount: "history.transferTaskMaxRetryCount", + TransferProcessorStartDelay: "history.transferProcessorStartDelay", + TransferProcessorFailoverStartDelay: "history.transferProcessorFailoverStartDelay", + TransferProcessorCompleteTransferFailureRetryCount: "history.transferProcessorCompleteTransferFailureRetryCount", + TransferProcessorUpdateShardTaskCount: "history.transferProcessorUpdateShardTaskCount", + TransferProcessorMaxPollInterval: "history.transferProcessorMaxPollInterval", + TransferProcessorMaxPollIntervalJitterCoefficient: "history.transferProcessorMaxPollIntervalJitterCoefficient", + TransferProcessorUpdateAckInterval: "history.transferProcessorUpdateAckInterval", + TransferProcessorUpdateAckIntervalJitterCoefficient: "history.transferProcessorUpdateAckIntervalJitterCoefficient", + TransferProcessorCompleteTransferInterval: "history.transferProcessorCompleteTransferInterval", + ReplicatorTaskBatchSize: "history.replicatorTaskBatchSize", + ReplicatorTaskWorkerCount: "history.replicatorTaskWorkerCount", + ReplicatorTaskMaxRetryCount: "history.replicatorTaskMaxRetryCount", + ReplicatorProcessorStartDelay: "history.replicatorProcessorStartDelay", + ReplicatorProcessorMaxPollRPS: "history.replicatorProcessorMaxPollRPS", + ReplicatorProcessorUpdateShardTaskCount: "history.replicatorProcessorUpdateShardTaskCount", + ReplicatorProcessorMaxPollInterval: "history.replicatorProcessorMaxPollInterval", + ReplicatorProcessorMaxPollIntervalJitterCoefficient: "history.replicatorProcessorMaxPollIntervalJitterCoefficient", + ReplicatorProcessorUpdateAckInterval: "history.replicatorProcessorUpdateAckInterval", + ReplicatorProcessorUpdateAckIntervalJitterCoefficient: "history.replicatorProcessorUpdateAckIntervalJitterCoefficient", + ExecutionMgrNumConns: "history.executionMgrNumConns", + HistoryMgrNumConns: "history.historyMgrNumConns", + MaximumBufferedEventsBatch: "history.maximumBufferedEventsBatch", + ShardUpdateMinInterval: "history.shardUpdateMinInterval", + ShardSyncMinInterval: "history.shardSyncMinInterval", // worker settings WorkerPersistenceMaxQPS: "worker.persistenceMaxQPS", @@ -217,6 +220,8 @@ const ( TimerProcessorUpdateShardTaskCount // TimerProcessorUpdateAckInterval is update interval for timer processor TimerProcessorUpdateAckInterval + // TimerProcessorUpdateAckIntervalJitterCoefficient is the update interval jitter coefficient + TimerProcessorUpdateAckIntervalJitterCoefficient // TimerProcessorCompleteTimerInterval is complete timer interval for timer processor TimerProcessorCompleteTimerInterval // TimerProcessorFailoverMaxPollRPS is max poll rate per second for timer processor @@ -251,6 +256,8 @@ const ( TransferProcessorMaxPollIntervalJitterCoefficient // TransferProcessorUpdateAckInterval is update interval for transferQueueProcessor TransferProcessorUpdateAckInterval + // TransferProcessorUpdateAckIntervalJitterCoefficient is the update interval jitter coefficient + TransferProcessorUpdateAckIntervalJitterCoefficient // TransferProcessorCompleteTransferInterval is complete timer interval for transferQueueProcessor TransferProcessorCompleteTransferInterval // ReplicatorTaskBatchSize is batch size for ReplicatorProcessor @@ -271,6 +278,8 @@ const ( ReplicatorProcessorMaxPollIntervalJitterCoefficient // ReplicatorProcessorUpdateAckInterval is update interval for ReplicatorProcessor ReplicatorProcessorUpdateAckInterval + // ReplicatorProcessorUpdateAckIntervalJitterCoefficient is the update interval jitter coefficient + ReplicatorProcessorUpdateAckIntervalJitterCoefficient // ExecutionMgrNumConns is persistence connections number for ExecutionManager ExecutionMgrNumConns // HistoryMgrNumConns is persistence connections number for HistoryManager diff --git a/service/history/queueProcessor.go b/service/history/queueProcessor.go index 6fe041a4bbc..9ffb31d8163 100644 --- a/service/history/queueProcessor.go +++ b/service/history/queueProcessor.go @@ -41,15 +41,16 @@ import ( type ( // QueueProcessorOptions is options passed to queue processor implementation QueueProcessorOptions struct { - StartDelay dynamicconfig.DurationPropertyFn - BatchSize dynamicconfig.IntPropertyFn - WorkerCount dynamicconfig.IntPropertyFn - MaxPollRPS dynamicconfig.IntPropertyFn - MaxPollInterval dynamicconfig.DurationPropertyFn - MaxPollIntervalJitterCoefficient dynamicconfig.FloatPropertyFn - UpdateAckInterval dynamicconfig.DurationPropertyFn - MaxRetryCount dynamicconfig.IntPropertyFn - MetricScope int + StartDelay dynamicconfig.DurationPropertyFn + BatchSize dynamicconfig.IntPropertyFn + WorkerCount dynamicconfig.IntPropertyFn + MaxPollRPS dynamicconfig.IntPropertyFn + MaxPollInterval dynamicconfig.DurationPropertyFn + MaxPollIntervalJitterCoefficient dynamicconfig.FloatPropertyFn + UpdateAckInterval dynamicconfig.DurationPropertyFn + UpdateAckIntervalJitterCoefficient dynamicconfig.FloatPropertyFn + MaxRetryCount dynamicconfig.IntPropertyFn + MetricScope int } queueProcessorBase struct { @@ -155,11 +156,17 @@ func (p *queueProcessorBase) processorPump() { } jitter := backoff.NewJitter() - pollTimer := time.NewTimer(jitter.JitDuration(p.options.MaxPollInterval(), p.options.MaxPollIntervalJitterCoefficient())) + pollTimer := time.NewTimer(jitter.JitDuration( + p.options.MaxPollInterval(), + p.options.MaxPollIntervalJitterCoefficient(), + )) defer pollTimer.Stop() - updateAckTicker := time.NewTicker(p.options.UpdateAckInterval()) - defer updateAckTicker.Stop() + updateAckTimer := time.NewTimer(jitter.JitDuration( + p.options.UpdateAckInterval(), + p.options.UpdateAckIntervalJitterCoefficient(), + )) + defer updateAckTimer.Stop() processorPumpLoop: for { @@ -172,11 +179,18 @@ processorPumpLoop: case <-p.notifyCh: p.processBatch(tasksCh) case <-pollTimer.C: - pollTimer.Reset(jitter.JitDuration(p.options.MaxPollInterval(), p.options.MaxPollIntervalJitterCoefficient())) + pollTimer.Reset(jitter.JitDuration( + p.options.MaxPollInterval(), + p.options.MaxPollIntervalJitterCoefficient(), + )) if p.lastPollTime.Add(p.options.MaxPollInterval()).Before(time.Now()) { p.processBatch(tasksCh) } - case <-updateAckTicker.C: + case <-updateAckTimer.C: + updateAckTimer.Reset(jitter.JitDuration( + p.options.UpdateAckInterval(), + p.options.UpdateAckIntervalJitterCoefficient(), + )) p.ackMgr.updateQueueAckLevel() } } diff --git a/service/history/replicatorQueueProcessor.go b/service/history/replicatorQueueProcessor.go index bb869f5dd2c..72ec635bc26 100644 --- a/service/history/replicatorQueueProcessor.go +++ b/service/history/replicatorQueueProcessor.go @@ -68,15 +68,16 @@ func newReplicatorQueueProcessor(shard ShardContext, replicator messaging.Produc config := shard.GetConfig() options := &QueueProcessorOptions{ - StartDelay: config.ReplicatorProcessorStartDelay, - BatchSize: config.ReplicatorTaskBatchSize, - WorkerCount: config.ReplicatorTaskWorkerCount, - MaxPollRPS: config.ReplicatorProcessorMaxPollRPS, - MaxPollInterval: config.ReplicatorProcessorMaxPollInterval, - MaxPollIntervalJitterCoefficient: config.ReplicatorProcessorMaxPollIntervalJitterCoefficient, - UpdateAckInterval: config.ReplicatorProcessorUpdateAckInterval, - MaxRetryCount: config.ReplicatorTaskMaxRetryCount, - MetricScope: metrics.ReplicatorQueueProcessorScope, + StartDelay: config.ReplicatorProcessorStartDelay, + BatchSize: config.ReplicatorTaskBatchSize, + WorkerCount: config.ReplicatorTaskWorkerCount, + MaxPollRPS: config.ReplicatorProcessorMaxPollRPS, + MaxPollInterval: config.ReplicatorProcessorMaxPollInterval, + MaxPollIntervalJitterCoefficient: config.ReplicatorProcessorMaxPollIntervalJitterCoefficient, + UpdateAckInterval: config.ReplicatorProcessorUpdateAckInterval, + UpdateAckIntervalJitterCoefficient: config.ReplicatorProcessorUpdateAckIntervalJitterCoefficient, + MaxRetryCount: config.ReplicatorTaskMaxRetryCount, + MetricScope: metrics.ReplicatorQueueProcessorScope, } logger = logger.WithFields(bark.Fields{ diff --git a/service/history/service.go b/service/history/service.go index 101150ccd4f..3ccffcbbada 100644 --- a/service/history/service.go +++ b/service/history/service.go @@ -53,43 +53,46 @@ type Config struct { StandbyClusterDelay dynamicconfig.DurationPropertyFn // TimerQueueProcessor settings - TimerTaskBatchSize dynamicconfig.IntPropertyFn - TimerTaskWorkerCount dynamicconfig.IntPropertyFn - TimerTaskMaxRetryCount dynamicconfig.IntPropertyFn - TimerProcessorStartDelay dynamicconfig.DurationPropertyFn - TimerProcessorFailoverStartDelay dynamicconfig.DurationPropertyFn - TimerProcessorGetFailureRetryCount dynamicconfig.IntPropertyFn - TimerProcessorCompleteTimerFailureRetryCount dynamicconfig.IntPropertyFn - TimerProcessorUpdateAckInterval dynamicconfig.DurationPropertyFn - TimerProcessorCompleteTimerInterval dynamicconfig.DurationPropertyFn - TimerProcessorFailoverMaxPollRPS dynamicconfig.IntPropertyFn - TimerProcessorMaxPollRPS dynamicconfig.IntPropertyFn - TimerProcessorMaxPollInterval dynamicconfig.DurationPropertyFn - TimerProcessorMaxPollIntervalJitterCoefficient dynamicconfig.FloatPropertyFn + TimerTaskBatchSize dynamicconfig.IntPropertyFn + TimerTaskWorkerCount dynamicconfig.IntPropertyFn + TimerTaskMaxRetryCount dynamicconfig.IntPropertyFn + TimerProcessorStartDelay dynamicconfig.DurationPropertyFn + TimerProcessorFailoverStartDelay dynamicconfig.DurationPropertyFn + TimerProcessorGetFailureRetryCount dynamicconfig.IntPropertyFn + TimerProcessorCompleteTimerFailureRetryCount dynamicconfig.IntPropertyFn + TimerProcessorUpdateAckInterval dynamicconfig.DurationPropertyFn + TimerProcessorUpdateAckIntervalJitterCoefficient dynamicconfig.FloatPropertyFn + TimerProcessorCompleteTimerInterval dynamicconfig.DurationPropertyFn + TimerProcessorFailoverMaxPollRPS dynamicconfig.IntPropertyFn + TimerProcessorMaxPollRPS dynamicconfig.IntPropertyFn + TimerProcessorMaxPollInterval dynamicconfig.DurationPropertyFn + TimerProcessorMaxPollIntervalJitterCoefficient dynamicconfig.FloatPropertyFn // TransferQueueProcessor settings - TransferTaskBatchSize dynamicconfig.IntPropertyFn - TransferTaskWorkerCount dynamicconfig.IntPropertyFn - TransferTaskMaxRetryCount dynamicconfig.IntPropertyFn - TransferProcessorStartDelay dynamicconfig.DurationPropertyFn - TransferProcessorFailoverStartDelay dynamicconfig.DurationPropertyFn - TransferProcessorCompleteTransferFailureRetryCount dynamicconfig.IntPropertyFn - TransferProcessorFailoverMaxPollRPS dynamicconfig.IntPropertyFn - TransferProcessorMaxPollRPS dynamicconfig.IntPropertyFn - TransferProcessorMaxPollInterval dynamicconfig.DurationPropertyFn - TransferProcessorMaxPollIntervalJitterCoefficient dynamicconfig.FloatPropertyFn - TransferProcessorUpdateAckInterval dynamicconfig.DurationPropertyFn - TransferProcessorCompleteTransferInterval dynamicconfig.DurationPropertyFn + TransferTaskBatchSize dynamicconfig.IntPropertyFn + TransferTaskWorkerCount dynamicconfig.IntPropertyFn + TransferTaskMaxRetryCount dynamicconfig.IntPropertyFn + TransferProcessorStartDelay dynamicconfig.DurationPropertyFn + TransferProcessorFailoverStartDelay dynamicconfig.DurationPropertyFn + TransferProcessorCompleteTransferFailureRetryCount dynamicconfig.IntPropertyFn + TransferProcessorFailoverMaxPollRPS dynamicconfig.IntPropertyFn + TransferProcessorMaxPollRPS dynamicconfig.IntPropertyFn + TransferProcessorMaxPollInterval dynamicconfig.DurationPropertyFn + TransferProcessorMaxPollIntervalJitterCoefficient dynamicconfig.FloatPropertyFn + TransferProcessorUpdateAckInterval dynamicconfig.DurationPropertyFn + TransferProcessorUpdateAckIntervalJitterCoefficient dynamicconfig.FloatPropertyFn + TransferProcessorCompleteTransferInterval dynamicconfig.DurationPropertyFn // ReplicatorQueueProcessor settings - ReplicatorTaskBatchSize dynamicconfig.IntPropertyFn - ReplicatorTaskWorkerCount dynamicconfig.IntPropertyFn - ReplicatorTaskMaxRetryCount dynamicconfig.IntPropertyFn - ReplicatorProcessorStartDelay dynamicconfig.DurationPropertyFn - ReplicatorProcessorMaxPollRPS dynamicconfig.IntPropertyFn - ReplicatorProcessorMaxPollInterval dynamicconfig.DurationPropertyFn - ReplicatorProcessorMaxPollIntervalJitterCoefficient dynamicconfig.FloatPropertyFn - ReplicatorProcessorUpdateAckInterval dynamicconfig.DurationPropertyFn + ReplicatorTaskBatchSize dynamicconfig.IntPropertyFn + ReplicatorTaskWorkerCount dynamicconfig.IntPropertyFn + ReplicatorTaskMaxRetryCount dynamicconfig.IntPropertyFn + ReplicatorProcessorStartDelay dynamicconfig.DurationPropertyFn + ReplicatorProcessorMaxPollRPS dynamicconfig.IntPropertyFn + ReplicatorProcessorMaxPollInterval dynamicconfig.DurationPropertyFn + ReplicatorProcessorMaxPollIntervalJitterCoefficient dynamicconfig.FloatPropertyFn + ReplicatorProcessorUpdateAckInterval dynamicconfig.DurationPropertyFn + ReplicatorProcessorUpdateAckIntervalJitterCoefficient dynamicconfig.FloatPropertyFn // Persistence settings ExecutionMgrNumConns dynamicconfig.IntPropertyFn @@ -111,52 +114,55 @@ type Config struct { // NewConfig returns new service config with default values func NewConfig(dc *dynamicconfig.Collection, numberOfShards int) *Config { return &Config{ - NumberOfShards: numberOfShards, - PersistenceMaxQPS: dc.GetFloat64Property(dynamicconfig.HistoryPersistenceMaxQPS, 9000), - HistoryCacheInitialSize: dc.GetIntProperty(dynamicconfig.HistoryCacheInitialSize, 128), - HistoryCacheMaxSize: dc.GetIntProperty(dynamicconfig.HistoryCacheMaxSize, 512), - HistoryCacheTTL: dc.GetDurationProperty(dynamicconfig.HistoryCacheTTL, time.Hour), - RangeSizeBits: 20, // 20 bits for sequencer, 2^20 sequence number for any range - AcquireShardInterval: dc.GetDurationProperty(dynamicconfig.AcquireShardInterval, time.Minute), - StandbyClusterDelay: dc.GetDurationProperty(dynamicconfig.AcquireShardInterval, 5*time.Minute), - TimerTaskBatchSize: dc.GetIntProperty(dynamicconfig.TimerTaskBatchSize, 100), - TimerTaskWorkerCount: dc.GetIntProperty(dynamicconfig.TimerTaskWorkerCount, 10), - TimerTaskMaxRetryCount: dc.GetIntProperty(dynamicconfig.TimerTaskMaxRetryCount, 100), - TimerProcessorStartDelay: dc.GetDurationProperty(dynamicconfig.TimerProcessorStartDelay, 1*time.Microsecond), - TimerProcessorFailoverStartDelay: dc.GetDurationProperty(dynamicconfig.TimerProcessorFailoverStartDelay, 5*time.Second), - TimerProcessorGetFailureRetryCount: dc.GetIntProperty(dynamicconfig.TimerProcessorGetFailureRetryCount, 5), - TimerProcessorCompleteTimerFailureRetryCount: dc.GetIntProperty(dynamicconfig.TimerProcessorCompleteTimerFailureRetryCount, 10), - TimerProcessorUpdateAckInterval: dc.GetDurationProperty(dynamicconfig.TimerProcessorUpdateAckInterval, 5*time.Second), - TimerProcessorCompleteTimerInterval: dc.GetDurationProperty(dynamicconfig.TimerProcessorCompleteTimerInterval, 3*time.Second), - TimerProcessorFailoverMaxPollRPS: dc.GetIntProperty(dynamicconfig.TimerProcessorFailoverMaxPollRPS, 1), - TimerProcessorMaxPollRPS: dc.GetIntProperty(dynamicconfig.TimerProcessorMaxPollRPS, 20), - TimerProcessorMaxPollInterval: dc.GetDurationProperty(dynamicconfig.TimerProcessorMaxPollInterval, 5*time.Minute), - TimerProcessorMaxPollIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.TimerProcessorMaxPollIntervalJitterCoefficient, 0.15), - TransferTaskBatchSize: dc.GetIntProperty(dynamicconfig.TransferTaskBatchSize, 100), - TransferProcessorFailoverMaxPollRPS: dc.GetIntProperty(dynamicconfig.TransferProcessorFailoverMaxPollRPS, 1), - TransferProcessorMaxPollRPS: dc.GetIntProperty(dynamicconfig.TransferProcessorMaxPollRPS, 20), - TransferTaskWorkerCount: dc.GetIntProperty(dynamicconfig.TransferTaskWorkerCount, 10), - TransferTaskMaxRetryCount: dc.GetIntProperty(dynamicconfig.TransferTaskMaxRetryCount, 100), - TransferProcessorStartDelay: dc.GetDurationProperty(dynamicconfig.TransferProcessorStartDelay, 1*time.Microsecond), - TransferProcessorFailoverStartDelay: dc.GetDurationProperty(dynamicconfig.TransferProcessorFailoverStartDelay, 5*time.Second), - TransferProcessorCompleteTransferFailureRetryCount: dc.GetIntProperty(dynamicconfig.TransferProcessorCompleteTransferFailureRetryCount, 10), - TransferProcessorMaxPollInterval: dc.GetDurationProperty(dynamicconfig.TransferProcessorMaxPollInterval, 1*time.Minute), - TransferProcessorMaxPollIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.TransferProcessorMaxPollIntervalJitterCoefficient, 0.15), - TransferProcessorUpdateAckInterval: dc.GetDurationProperty(dynamicconfig.TransferProcessorUpdateAckInterval, 5*time.Second), - TransferProcessorCompleteTransferInterval: dc.GetDurationProperty(dynamicconfig.TransferProcessorCompleteTransferInterval, 3*time.Second), - ReplicatorTaskBatchSize: dc.GetIntProperty(dynamicconfig.ReplicatorTaskBatchSize, 100), - ReplicatorTaskWorkerCount: dc.GetIntProperty(dynamicconfig.ReplicatorTaskWorkerCount, 10), - ReplicatorTaskMaxRetryCount: dc.GetIntProperty(dynamicconfig.ReplicatorTaskMaxRetryCount, 100), - ReplicatorProcessorStartDelay: dc.GetDurationProperty(dynamicconfig.ReplicatorProcessorStartDelay, 1*time.Microsecond), - ReplicatorProcessorMaxPollRPS: dc.GetIntProperty(dynamicconfig.ReplicatorProcessorMaxPollRPS, 20), - ReplicatorProcessorMaxPollInterval: dc.GetDurationProperty(dynamicconfig.ReplicatorProcessorMaxPollInterval, 1*time.Minute), - ReplicatorProcessorMaxPollIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.ReplicatorProcessorMaxPollIntervalJitterCoefficient, 0.15), - ReplicatorProcessorUpdateAckInterval: dc.GetDurationProperty(dynamicconfig.ReplicatorProcessorUpdateAckInterval, 5*time.Second), - ExecutionMgrNumConns: dc.GetIntProperty(dynamicconfig.ExecutionMgrNumConns, 50), - HistoryMgrNumConns: dc.GetIntProperty(dynamicconfig.HistoryMgrNumConns, 50), - MaximumBufferedEventsBatch: dc.GetIntProperty(dynamicconfig.MaximumBufferedEventsBatch, 100), - ShardUpdateMinInterval: dc.GetDurationProperty(dynamicconfig.ShardUpdateMinInterval, 5*time.Minute), - ShardSyncMinInterval: dc.GetDurationProperty(dynamicconfig.ShardSyncMinInterval, 5*time.Minute), + NumberOfShards: numberOfShards, + PersistenceMaxQPS: dc.GetFloat64Property(dynamicconfig.HistoryPersistenceMaxQPS, 9000), + HistoryCacheInitialSize: dc.GetIntProperty(dynamicconfig.HistoryCacheInitialSize, 128), + HistoryCacheMaxSize: dc.GetIntProperty(dynamicconfig.HistoryCacheMaxSize, 512), + HistoryCacheTTL: dc.GetDurationProperty(dynamicconfig.HistoryCacheTTL, time.Hour), + RangeSizeBits: 20, // 20 bits for sequencer, 2^20 sequence number for any range + AcquireShardInterval: dc.GetDurationProperty(dynamicconfig.AcquireShardInterval, time.Minute), + StandbyClusterDelay: dc.GetDurationProperty(dynamicconfig.AcquireShardInterval, 5*time.Minute), + TimerTaskBatchSize: dc.GetIntProperty(dynamicconfig.TimerTaskBatchSize, 100), + TimerTaskWorkerCount: dc.GetIntProperty(dynamicconfig.TimerTaskWorkerCount, 10), + TimerTaskMaxRetryCount: dc.GetIntProperty(dynamicconfig.TimerTaskMaxRetryCount, 100), + TimerProcessorStartDelay: dc.GetDurationProperty(dynamicconfig.TimerProcessorStartDelay, 1*time.Microsecond), + TimerProcessorFailoverStartDelay: dc.GetDurationProperty(dynamicconfig.TimerProcessorFailoverStartDelay, 5*time.Second), + TimerProcessorGetFailureRetryCount: dc.GetIntProperty(dynamicconfig.TimerProcessorGetFailureRetryCount, 5), + TimerProcessorCompleteTimerFailureRetryCount: dc.GetIntProperty(dynamicconfig.TimerProcessorCompleteTimerFailureRetryCount, 10), + TimerProcessorUpdateAckInterval: dc.GetDurationProperty(dynamicconfig.TimerProcessorUpdateAckInterval, 5*time.Second), + TimerProcessorUpdateAckIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.TimerProcessorUpdateAckIntervalJitterCoefficient, 0.15), + TimerProcessorCompleteTimerInterval: dc.GetDurationProperty(dynamicconfig.TimerProcessorCompleteTimerInterval, 3*time.Second), + TimerProcessorFailoverMaxPollRPS: dc.GetIntProperty(dynamicconfig.TimerProcessorFailoverMaxPollRPS, 1), + TimerProcessorMaxPollRPS: dc.GetIntProperty(dynamicconfig.TimerProcessorMaxPollRPS, 20), + TimerProcessorMaxPollInterval: dc.GetDurationProperty(dynamicconfig.TimerProcessorMaxPollInterval, 5*time.Minute), + TimerProcessorMaxPollIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.TimerProcessorMaxPollIntervalJitterCoefficient, 0.15), + TransferTaskBatchSize: dc.GetIntProperty(dynamicconfig.TransferTaskBatchSize, 100), + TransferProcessorFailoverMaxPollRPS: dc.GetIntProperty(dynamicconfig.TransferProcessorFailoverMaxPollRPS, 1), + TransferProcessorMaxPollRPS: dc.GetIntProperty(dynamicconfig.TransferProcessorMaxPollRPS, 20), + TransferTaskWorkerCount: dc.GetIntProperty(dynamicconfig.TransferTaskWorkerCount, 10), + TransferTaskMaxRetryCount: dc.GetIntProperty(dynamicconfig.TransferTaskMaxRetryCount, 100), + TransferProcessorStartDelay: dc.GetDurationProperty(dynamicconfig.TransferProcessorStartDelay, 1*time.Microsecond), + TransferProcessorFailoverStartDelay: dc.GetDurationProperty(dynamicconfig.TransferProcessorFailoverStartDelay, 5*time.Second), + TransferProcessorCompleteTransferFailureRetryCount: dc.GetIntProperty(dynamicconfig.TransferProcessorCompleteTransferFailureRetryCount, 10), + TransferProcessorMaxPollInterval: dc.GetDurationProperty(dynamicconfig.TransferProcessorMaxPollInterval, 1*time.Minute), + TransferProcessorMaxPollIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.TransferProcessorMaxPollIntervalJitterCoefficient, 0.15), + TransferProcessorUpdateAckInterval: dc.GetDurationProperty(dynamicconfig.TransferProcessorUpdateAckInterval, 5*time.Second), + TransferProcessorUpdateAckIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.TransferProcessorUpdateAckIntervalJitterCoefficient, 0.15), + TransferProcessorCompleteTransferInterval: dc.GetDurationProperty(dynamicconfig.TransferProcessorCompleteTransferInterval, 3*time.Second), + ReplicatorTaskBatchSize: dc.GetIntProperty(dynamicconfig.ReplicatorTaskBatchSize, 100), + ReplicatorTaskWorkerCount: dc.GetIntProperty(dynamicconfig.ReplicatorTaskWorkerCount, 10), + ReplicatorTaskMaxRetryCount: dc.GetIntProperty(dynamicconfig.ReplicatorTaskMaxRetryCount, 100), + ReplicatorProcessorStartDelay: dc.GetDurationProperty(dynamicconfig.ReplicatorProcessorStartDelay, 1*time.Microsecond), + ReplicatorProcessorMaxPollRPS: dc.GetIntProperty(dynamicconfig.ReplicatorProcessorMaxPollRPS, 20), + ReplicatorProcessorMaxPollInterval: dc.GetDurationProperty(dynamicconfig.ReplicatorProcessorMaxPollInterval, 1*time.Minute), + ReplicatorProcessorMaxPollIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.ReplicatorProcessorMaxPollIntervalJitterCoefficient, 0.15), + ReplicatorProcessorUpdateAckInterval: dc.GetDurationProperty(dynamicconfig.ReplicatorProcessorUpdateAckInterval, 5*time.Second), + ReplicatorProcessorUpdateAckIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.ReplicatorProcessorUpdateAckIntervalJitterCoefficient, 0.15), + ExecutionMgrNumConns: dc.GetIntProperty(dynamicconfig.ExecutionMgrNumConns, 50), + HistoryMgrNumConns: dc.GetIntProperty(dynamicconfig.HistoryMgrNumConns, 50), + MaximumBufferedEventsBatch: dc.GetIntProperty(dynamicconfig.MaximumBufferedEventsBatch, 100), + ShardUpdateMinInterval: dc.GetDurationProperty(dynamicconfig.ShardUpdateMinInterval, 5*time.Minute), + ShardSyncMinInterval: dc.GetDurationProperty(dynamicconfig.ShardSyncMinInterval, 5*time.Minute), // history client: client/history/client.go set the client timeout 30s LongPollExpirationInterval: dc.GetDurationPropertyFilteredByDomain( dynamicconfig.HistoryLongPollExpirationInterval, time.Second*20, diff --git a/service/history/timerQueueProcessorBase.go b/service/history/timerQueueProcessorBase.go index c55a12934c5..80bafeed0a2 100644 --- a/service/history/timerQueueProcessorBase.go +++ b/service/history/timerQueueProcessorBase.go @@ -279,8 +279,11 @@ func (t *timerQueueProcessorBase) internalProcessor() error { )) defer pollTimer.Stop() - updateAckTicker := time.NewTicker(t.shard.GetConfig().TimerProcessorUpdateAckInterval()) - defer updateAckTicker.Stop() + updateAckTimer := time.NewTimer(jitter.JitDuration( + t.config.TimerProcessorUpdateAckInterval(), + t.config.TimerProcessorUpdateAckIntervalJitterCoefficient(), + )) + defer updateAckTimer.Stop() for { // Wait until one of four things occurs: @@ -321,7 +324,11 @@ func (t *timerQueueProcessorBase) internalProcessor() error { timerGate.Update(lookAheadTimer.VisibilityTimestamp) } } - case <-updateAckTicker.C: + case <-updateAckTimer.C: + updateAckTimer.Reset(jitter.JitDuration( + t.config.TimerProcessorUpdateAckInterval(), + t.config.TimerProcessorUpdateAckIntervalJitterCoefficient(), + )) t.timerQueueAckMgr.updateAckLevel() case <-t.newTimerCh: t.newTimeLock.Lock() diff --git a/service/history/transferQueueActiveProcessor.go b/service/history/transferQueueActiveProcessor.go index 53c7b8ed6d2..d7ae58c4e3f 100644 --- a/service/history/transferQueueActiveProcessor.go +++ b/service/history/transferQueueActiveProcessor.go @@ -69,15 +69,16 @@ func newTransferQueueActiveProcessor(shard ShardContext, historyService *history matchingClient matching.Client, historyClient history.Client, logger bark.Logger) *transferQueueActiveProcessorImpl { config := shard.GetConfig() options := &QueueProcessorOptions{ - StartDelay: config.TransferProcessorStartDelay, - BatchSize: config.TransferTaskBatchSize, - WorkerCount: config.TransferTaskWorkerCount, - MaxPollRPS: config.TransferProcessorMaxPollRPS, - MaxPollInterval: config.TransferProcessorMaxPollInterval, - MaxPollIntervalJitterCoefficient: config.TransferProcessorMaxPollIntervalJitterCoefficient, - UpdateAckInterval: config.TransferProcessorUpdateAckInterval, - MaxRetryCount: config.TransferTaskMaxRetryCount, - MetricScope: metrics.TransferActiveQueueProcessorScope, + StartDelay: config.TransferProcessorStartDelay, + BatchSize: config.TransferTaskBatchSize, + WorkerCount: config.TransferTaskWorkerCount, + MaxPollRPS: config.TransferProcessorMaxPollRPS, + MaxPollInterval: config.TransferProcessorMaxPollInterval, + MaxPollIntervalJitterCoefficient: config.TransferProcessorMaxPollIntervalJitterCoefficient, + UpdateAckInterval: config.TransferProcessorUpdateAckInterval, + UpdateAckIntervalJitterCoefficient: config.TransferProcessorUpdateAckIntervalJitterCoefficient, + MaxRetryCount: config.TransferTaskMaxRetryCount, + MetricScope: metrics.TransferActiveQueueProcessorScope, } currentClusterName := shard.GetService().GetClusterMetadata().GetCurrentClusterName() logger = logger.WithFields(bark.Fields{ @@ -128,15 +129,16 @@ func newTransferQueueFailoverProcessor(shard ShardContext, historyService *histo minLevel int64, maxLevel int64, logger bark.Logger) *transferQueueActiveProcessorImpl { config := shard.GetConfig() options := &QueueProcessorOptions{ - StartDelay: config.TransferProcessorFailoverStartDelay, - BatchSize: config.TransferTaskBatchSize, - WorkerCount: config.TransferTaskWorkerCount, - MaxPollRPS: config.TransferProcessorFailoverMaxPollRPS, - MaxPollInterval: config.TransferProcessorMaxPollInterval, - MaxPollIntervalJitterCoefficient: config.TransferProcessorMaxPollIntervalJitterCoefficient, - UpdateAckInterval: config.TransferProcessorUpdateAckInterval, - MaxRetryCount: config.TransferTaskMaxRetryCount, - MetricScope: metrics.TransferActiveQueueProcessorScope, + StartDelay: config.TransferProcessorFailoverStartDelay, + BatchSize: config.TransferTaskBatchSize, + WorkerCount: config.TransferTaskWorkerCount, + MaxPollRPS: config.TransferProcessorFailoverMaxPollRPS, + MaxPollInterval: config.TransferProcessorMaxPollInterval, + MaxPollIntervalJitterCoefficient: config.TransferProcessorMaxPollIntervalJitterCoefficient, + UpdateAckInterval: config.TransferProcessorUpdateAckInterval, + UpdateAckIntervalJitterCoefficient: config.TransferProcessorUpdateAckIntervalJitterCoefficient, + MaxRetryCount: config.TransferTaskMaxRetryCount, + MetricScope: metrics.TransferActiveQueueProcessorScope, } currentClusterName := shard.GetService().GetClusterMetadata().GetCurrentClusterName() logger = logger.WithFields(bark.Fields{ diff --git a/service/history/transferQueueStandbyProcessor.go b/service/history/transferQueueStandbyProcessor.go index 81ab903b731..d3f28c89222 100644 --- a/service/history/transferQueueStandbyProcessor.go +++ b/service/history/transferQueueStandbyProcessor.go @@ -51,15 +51,16 @@ func newTransferQueueStandbyProcessor(clusterName string, shard ShardContext, hi visibilityMgr persistence.VisibilityManager, logger bark.Logger) *transferQueueStandbyProcessorImpl { config := shard.GetConfig() options := &QueueProcessorOptions{ - StartDelay: config.TransferProcessorStartDelay, - BatchSize: config.TransferTaskBatchSize, - WorkerCount: config.TransferTaskWorkerCount, - MaxPollRPS: config.TransferProcessorMaxPollRPS, - MaxPollInterval: config.TransferProcessorMaxPollInterval, - MaxPollIntervalJitterCoefficient: config.TransferProcessorMaxPollIntervalJitterCoefficient, - UpdateAckInterval: config.TransferProcessorUpdateAckInterval, - MaxRetryCount: config.TransferTaskMaxRetryCount, - MetricScope: metrics.TransferStandbyQueueProcessorScope, + StartDelay: config.TransferProcessorStartDelay, + BatchSize: config.TransferTaskBatchSize, + WorkerCount: config.TransferTaskWorkerCount, + MaxPollRPS: config.TransferProcessorMaxPollRPS, + MaxPollInterval: config.TransferProcessorMaxPollInterval, + MaxPollIntervalJitterCoefficient: config.TransferProcessorMaxPollIntervalJitterCoefficient, + UpdateAckInterval: config.TransferProcessorUpdateAckInterval, + UpdateAckIntervalJitterCoefficient: config.TransferProcessorUpdateAckIntervalJitterCoefficient, + MaxRetryCount: config.TransferTaskMaxRetryCount, + MetricScope: metrics.TransferStandbyQueueProcessorScope, } logger = logger.WithFields(bark.Fields{ logging.TagWorkflowCluster: clusterName,