From d3bb8f0d4575d2468ddfee462209c507fa7a7808 Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Fri, 27 Jan 2023 16:39:11 -0800 Subject: [PATCH 1/4] Improve task scheduler rate limiter --- common/dynamicconfig/constants.go | 5 + common/metrics/metric_defs.go | 1 + common/tasks/benchmark_test.go | 23 ++- .../tasks/interleaved_weighted_round_robin.go | 179 +++++++++++------- .../interleaved_weighted_round_robin_test.go | 14 +- service/history/archival_queue_factory.go | 11 +- .../history/archival_queue_factory_test.go | 2 +- .../history/archival_queue_task_executor.go | 3 + service/history/configs/config.go | 16 +- service/history/queueFactoryBase.go | 8 - .../history/queues/queue_scheduled_test.go | 6 +- service/history/queues/scheduler.go | 32 +++- service/history/timerQueueFactory.go | 3 +- service/history/transferQueueFactory.go | 3 +- service/history/visibilityQueueFactory.go | 3 +- 15 files changed, 192 insertions(+), 117 deletions(-) diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index f3dbb2e77f0..b96aa6baecd 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -385,6 +385,11 @@ const ( // TaskSchedulerEnableRateLimiter indicates if rate limiter should be enabled in task scheduler TaskSchedulerEnableRateLimiter = "history.taskSchedulerEnableRateLimiter" + // TaskSchedulerEnableRateLimiterShadowMode indicates if task scheduler rate limiter should run in shadow mode + // i.e. through rate limiter and emit metrics but do not actually block/throttle task scheduling + TaskSchedulerEnableRateLimiterShadowMode = "history.taskSchedulerEnableRateLimiterShadowMode" + // TaskSchedulerThrottleDuration is the throttle duration when task scheduled exceeds max qps + TaskSchedulerThrottleDuration = "history.taskSchedulerThrottleDuration" // TaskSchedulerMaxQPS is the max qps task schedulers on a host can schedule tasks // If value less or equal to 0, will fall back to HistoryPersistenceMaxQPS TaskSchedulerMaxQPS = "history.taskSchedulerMaxQPS" diff --git a/common/metrics/metric_defs.go b/common/metrics/metric_defs.go index 2836a689559..ffe8c00d2a6 100644 --- a/common/metrics/metric_defs.go +++ b/common/metrics/metric_defs.go @@ -1439,6 +1439,7 @@ var ( TaskBatchCompleteCounter = NewCounterDef("task_batch_complete_counter") TaskReschedulerPendingTasks = NewDimensionlessHistogramDef("task_rescheduler_pending_tasks") PendingTasksCounter = NewDimensionlessHistogramDef("pending_tasks") + TaskSchedulerThrottled = NewCounterDef("task_scheduler_throttled") QueueScheduleLatency = NewTimerDef("queue_latency_schedule") // latency for scheduling 100 tasks in one task channel QueueReaderCountHistogram = NewDimensionlessHistogramDef("queue_reader_count") QueueSliceCountHistogram = NewDimensionlessHistogramDef("queue_slice_count") diff --git a/common/tasks/benchmark_test.go b/common/tasks/benchmark_test.go index ad767ded1f0..a4325851a1c 100644 --- a/common/tasks/benchmark_test.go +++ b/common/tasks/benchmark_test.go @@ -33,6 +33,7 @@ import ( "go.temporal.io/server/common/clock" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log" + "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/quotas" ) @@ -62,15 +63,18 @@ func BenchmarkInterleavedWeightedRoundRobinScheduler_Sequential(b *testing.B) { scheduler := NewInterleavedWeightedRoundRobinScheduler( InterleavedWeightedRoundRobinSchedulerOptions[*noopTask, int]{ - TaskChannelKeyFn: func(nt *noopTask) int { return rand.Intn(4) }, - ChannelWeightFn: func(key int) int { return channelKeyToWeight[key] }, - ChannelQuotaRequestFn: func(key int) quotas.Request { return quotas.NewRequest("", 1, "", "", "") }, - EnableRateLimiter: dynamicconfig.GetBoolPropertyFn(true), + TaskChannelKeyFn: func(nt *noopTask) int { return rand.Intn(4) }, + ChannelWeightFn: func(key int) int { return channelKeyToWeight[key] }, + ChannelQuotaRequestFn: func(key int) quotas.Request { return quotas.NewRequest("", 1, "", "", "") }, + TaskChannelMetricTagsFn: func(key int) []metrics.Tag { return nil }, + EnableRateLimiter: dynamicconfig.GetBoolPropertyFn(true), + EnableRateLimiterShadowMode: dynamicconfig.GetBoolPropertyFn(false), }, Scheduler[*noopTask](&noopScheduler{}), quotas.NoopRequestRateLimiter, clock.NewRealTimeSource(), logger, + metrics.NoopMetricsHandler, ) scheduler.Start() defer scheduler.Stop() @@ -92,15 +96,18 @@ func BenchmarkInterleavedWeightedRoundRobinScheduler_Parallel(b *testing.B) { scheduler := NewInterleavedWeightedRoundRobinScheduler( InterleavedWeightedRoundRobinSchedulerOptions[*noopTask, int]{ - TaskChannelKeyFn: func(nt *noopTask) int { return rand.Intn(4) }, - ChannelWeightFn: func(key int) int { return channelKeyToWeight[key] }, - ChannelQuotaRequestFn: func(key int) quotas.Request { return quotas.NewRequest("", 1, "", "", "") }, - EnableRateLimiter: dynamicconfig.GetBoolPropertyFn(true), + TaskChannelKeyFn: func(nt *noopTask) int { return rand.Intn(4) }, + ChannelWeightFn: func(key int) int { return channelKeyToWeight[key] }, + ChannelQuotaRequestFn: func(key int) quotas.Request { return quotas.NewRequest("", 1, "", "", "") }, + TaskChannelMetricTagsFn: func(key int) []metrics.Tag { return nil }, + EnableRateLimiter: dynamicconfig.GetBoolPropertyFn(true), + EnableRateLimiterShadowMode: dynamicconfig.GetBoolPropertyFn(false), }, Scheduler[*noopTask](&noopScheduler{}), quotas.NoopRequestRateLimiter, clock.NewRealTimeSource(), logger, + metrics.NoopMetricsHandler, ) scheduler.Start() defer scheduler.Stop() diff --git a/common/tasks/interleaved_weighted_round_robin.go b/common/tasks/interleaved_weighted_round_robin.go index a96a5cc7ca3..67b3f59cb16 100644 --- a/common/tasks/interleaved_weighted_round_robin.go +++ b/common/tasks/interleaved_weighted_round_robin.go @@ -32,16 +32,14 @@ import ( "time" "go.temporal.io/server/common" - "go.temporal.io/server/common/backoff" "go.temporal.io/server/common/clock" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log" + "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/quotas" - "go.temporal.io/server/common/util" ) const ( - iwrrMinDispatchThrottleDuration = 1 * time.Second checkRateLimiterEnabledInterval = 1 * time.Minute ) @@ -59,12 +57,17 @@ type ( ChannelWeightUpdateCh chan struct{} // Required for converting task channel to rate limit request ChannelQuotaRequestFn ChannelQuotaRequestFn[K] - // Required for determining if rate limiter should be enabled. + // Required for getting metrics tags for task channels + TaskChannelMetricTagsFn TaskChannelMetricTagsFn[K] + // Required for determining if rate limiter should be enabled EnableRateLimiter dynamicconfig.BoolPropertyFn - // Optional, if specified and greater than 1s the throttle duration will be a random - // value between 1s to the value specified. - // If not specified or not valid, the throttle duration will always be 1s. - MaxDispatchThrottleDuration time.Duration + // Required for determining if task should still go through rate limiter and + // emit metrics, but not actually block task dispatching. + // only takes effect when rate limiter is not enabled + EnableRateLimiterShadowMode dynamicconfig.BoolPropertyFn + // Required for determining how long scheduler should be throttled + // when exceeding allowed dispatch rate + DispatchThrottleDuration dynamicconfig.DurationPropertyFn } // TaskChannelKeyFn is the function for mapping a task to its task channel (key) @@ -76,15 +79,19 @@ type ( // ChannelQuotaRequestFn is the function for mapping a task channel (key) to its rate limit request ChannelQuotaRequestFn[K comparable] func(K) quotas.Request + // TaskChannelMetricTagsFn is the function for mapping a task channel (key) to its metrics tags + TaskChannelMetricTagsFn[K comparable] func(K) []metrics.Tag + // InterleavedWeightedRoundRobinScheduler is a round robin scheduler implementation // ref: https://en.wikipedia.org/wiki/Weighted_round_robin#Interleaved_WRR InterleavedWeightedRoundRobinScheduler[T Task, K comparable] struct { status int32 - fifoScheduler Scheduler[T] - rateLimiter quotas.RequestRateLimiter - timeSource clock.TimeSource - logger log.Logger + fifoScheduler Scheduler[T] + rateLimiter quotas.RequestRateLimiter + timeSource clock.TimeSource + logger log.Logger + metricsHandler metrics.Handler notifyChan chan struct{} shutdownChan chan struct{} @@ -96,10 +103,10 @@ type ( sync.RWMutex weightedChannels map[K]*WeightedChannel[T] - dispatchTimerLock sync.Mutex - dispatchTimer *time.Timer - rateLimiterEnabled atomic.Value - iwrrChannels atomic.Value + dispatchTimerLock sync.Mutex + dispatchTimer *time.Timer + rateLimiterConfig atomic.Value // rateLimitConfig + iwrrChannels atomic.Value } channelWithStatus[T Task, K comparable] struct { @@ -107,13 +114,18 @@ type ( key K rateLimitRequest quotas.Request + metricsTags []metrics.Tag throttled bool - moreTasks bool // this is only a hint since there's no way to peek the channel } channelsWithStatus[T Task, K comparable] []*channelWithStatus[T, K] + rateLimiterConfig struct { + enableThrottle bool + enableShadowMode bool + } + iwrrChannels[T Task, K comparable] struct { channels channelsWithStatus[T, K] @@ -135,31 +147,37 @@ func NewInterleavedWeightedRoundRobinScheduler[T Task, K comparable]( rateLimiter quotas.RequestRateLimiter, timeSource clock.TimeSource, logger log.Logger, + metricsHandler metrics.Handler, ) *InterleavedWeightedRoundRobinScheduler[T, K] { channels := atomic.Value{} channels.Store(iwrrChannels[T, K]{}) - enableRateLimiter := atomic.Value{} - enableRateLimiter.Store(options.EnableRateLimiter()) + rlConfig := atomic.Value{} + rlConfig.Store( + rateLimiterConfig{ + enableThrottle: options.EnableRateLimiter(), + enableShadowMode: options.EnableRateLimiterShadowMode(), + }, + ) - options.MaxDispatchThrottleDuration = util.Max(iwrrMinDispatchThrottleDuration, options.MaxDispatchThrottleDuration) return &InterleavedWeightedRoundRobinScheduler[T, K]{ status: common.DaemonStatusInitialized, - fifoScheduler: fifoScheduler, - rateLimiter: rateLimiter, - timeSource: timeSource, - logger: logger, + fifoScheduler: fifoScheduler, + rateLimiter: rateLimiter, + timeSource: timeSource, + logger: logger, + metricsHandler: metricsHandler, options: options, notifyChan: make(chan struct{}, 1), shutdownChan: make(chan struct{}), - numInflightTask: 0, - weightedChannels: make(map[K]*WeightedChannel[T]), - rateLimiterEnabled: enableRateLimiter, - iwrrChannels: channels, + numInflightTask: 0, + weightedChannels: make(map[K]*WeightedChannel[T]), + rateLimiterConfig: rlConfig, + iwrrChannels: channels, } } @@ -244,7 +262,10 @@ func (s *InterleavedWeightedRoundRobinScheduler[T, K]) eventLoop() { case <-s.notifyChan: s.dispatchTasksWithWeight() case <-checkRateLimiterEnabledTimer.C: - s.rateLimiterEnabled.Store(s.options.EnableRateLimiter()) + s.rateLimiterConfig.Store(rateLimiterConfig{ + enableThrottle: s.options.EnableRateLimiter(), + enableShadowMode: s.options.EnableRateLimiterShadowMode(), + }) case <-s.shutdownChan: return } @@ -285,8 +306,8 @@ func (s *InterleavedWeightedRoundRobinScheduler[T, K]) flattenWeightedChannelsLo WeightedChannel: weightedChan, key: channelKey, rateLimitRequest: s.options.ChannelQuotaRequestFn(channelKey), + metricsTags: s.options.TaskChannelMetricTagsFn(channelKey), throttled: false, - moreTasks: false, }) } sort.Slice(weightedChannels, func(i, j int) bool { @@ -311,17 +332,14 @@ func (s *InterleavedWeightedRoundRobinScheduler[T, K]) channels() iwrrChannels[T } func (s *InterleavedWeightedRoundRobinScheduler[T, K]) setupDispatchTimer() { - throttleDuration := iwrrMinDispatchThrottleDuration + - backoff.FullJitter(s.options.MaxDispatchThrottleDuration-iwrrMinDispatchThrottleDuration) - s.dispatchTimerLock.Lock() defer s.dispatchTimerLock.Unlock() if s.dispatchTimer != nil { - s.dispatchTimer.Stop() + return } - s.dispatchTimer = time.AfterFunc(throttleDuration, func() { + s.dispatchTimer = time.AfterFunc(s.options.DispatchThrottleDuration(), func() { s.dispatchTimerLock.Lock() defer s.dispatchTimerLock.Unlock() @@ -379,16 +397,18 @@ LoopDispatch: } iwrrChannels := s.channels() - enableRateLimiter := s.isRateLimiterEnabled() - s.doDispatchTasksWithWeight(iwrrChannels, enableRateLimiter) + rateLimiterConfig := s.getRateLimiterConfig() + s.doDispatchTasksWithWeight(iwrrChannels, rateLimiterConfig) - if !enableRateLimiter { + if !rateLimiterConfig.enableThrottle { continue LoopDispatch } - // rate limiter enabled + // rate limiter throttled enabled + // we only want to perform next round of dispatch if there are tasks in non-throttled channel. + // // all channels = throttled channels + not throttled but has more task + not throttled and no more task - // - If there's channel that's not throttled but has more task, need to trigger next round + // - If there's channel that's not throttled and has more task, need to trigger next round // of dispatch immediately. // - Otherwise all channels = throttled channels + not throttled and no more task // then as long as there's throttled channel, need to set a timer to try dispatch later @@ -399,7 +419,7 @@ LoopDispatch: numThrottled++ continue } - if channel.moreTasks { + if len(channel.Chan()) > 0 { // there's channel that is not throttled and may have more tasks // start a new round of dispatch immediately continue LoopDispatch @@ -416,20 +436,20 @@ LoopDispatch: func (s *InterleavedWeightedRoundRobinScheduler[T, K]) doDispatchTasksWithWeight( iwrrChannels iwrrChannels[T, K], - enableRateLimiter bool, + rateLimiterConfig rateLimiterConfig, ) { + for _, channel := range iwrrChannels.channels { + channel.throttled = false + } + rateLimiter := quotas.NoopRequestRateLimiter - if enableRateLimiter { + if rateLimiterConfig.enableThrottle || rateLimiterConfig.enableShadowMode { rateLimiter = s.rateLimiter - for _, channel := range iwrrChannels.channels { - channel.throttled = false - channel.moreTasks = false - } } numFlattenedChannels := len(iwrrChannels.flattenedChannels) startIdx := rand.Intn(numFlattenedChannels) - numTasks := int64(0) + taskDispatched := int64(0) numThrottled := 0 LoopDispatch: for i := 0; i != numFlattenedChannels; i++ { @@ -439,44 +459,57 @@ LoopDispatch: continue LoopDispatch } - now := s.timeSource.Now() - reservation := rateLimiter.Reserve( - now, - channel.rateLimitRequest, - ) - if reservation.DelayFrom(now) != 0 { - reservation.CancelAt(now) - channel.throttled = true - numThrottled++ - if numThrottled == len(iwrrChannels.channels) { - // all channels throttled - break LoopDispatch - } + if len(channel.Chan()) == 0 { continue LoopDispatch } + + if !rateLimiter.Allow(s.timeSource.Now(), channel.rateLimitRequest) { + s.metricsHandler.Counter(metrics.TaskSchedulerThrottled.GetMetricName()).Record(1, channel.metricsTags...) + + if rateLimiterConfig.enableThrottle { + channel.throttled = true + numThrottled++ + if numThrottled == len(iwrrChannels.channels) { + // all channels throttled + break LoopDispatch + } + continue LoopDispatch + } + + // throttled, but in shadow mode, do not actually throttle task dispatching + } + select { case task := <-channel.Chan(): s.fifoScheduler.Submit(task) - numTasks++ - channel.moreTasks = true + taskDispatched++ default: - reservation.CancelAt(now) - channel.moreTasks = false continue LoopDispatch } } - atomic.AddInt64(&s.numInflightTask, -numTasks) + atomic.AddInt64(&s.numInflightTask, -taskDispatched) } func (s *InterleavedWeightedRoundRobinScheduler[T, K]) tryDispatchTaskDirectly( channelKey K, task T, ) bool { - if s.isRateLimiterEnabled() && !s.rateLimiter.Allow( - s.timeSource.Now(), - s.options.ChannelQuotaRequestFn(channelKey), - ) { - return false + rateLimiterConfig := s.getRateLimiterConfig() + if rateLimiterConfig.enableThrottle || rateLimiterConfig.enableShadowMode { + if !s.rateLimiter.Allow( + s.timeSource.Now(), + s.options.ChannelQuotaRequestFn(channelKey), + ) { + s.metricsHandler.Counter(metrics.TaskSchedulerThrottled.GetMetricName()).Record(1, s.options.TaskChannelMetricTagsFn(channelKey)...) + + if rateLimiterConfig.enableThrottle { + return false + } + + // throttled, but in shadow mode, continue to dispatch + } + + // not throttled, continue to dispatch } dispatched := s.fifoScheduler.TrySubmit(task) @@ -511,8 +544,8 @@ DrainLoop: atomic.AddInt64(&s.numInflightTask, -numTasks) } -func (s *InterleavedWeightedRoundRobinScheduler[T, K]) isRateLimiterEnabled() bool { - return s.rateLimiterEnabled.Load().(bool) +func (s *InterleavedWeightedRoundRobinScheduler[T, K]) getRateLimiterConfig() rateLimiterConfig { + return s.rateLimiterConfig.Load().(rateLimiterConfig) } func (s *InterleavedWeightedRoundRobinScheduler[T, K]) isStopped() bool { diff --git a/common/tasks/interleaved_weighted_round_robin_test.go b/common/tasks/interleaved_weighted_round_robin_test.go index 0f4ca7ea626..89a191de774 100644 --- a/common/tasks/interleaved_weighted_round_robin_test.go +++ b/common/tasks/interleaved_weighted_round_robin_test.go @@ -38,6 +38,7 @@ import ( "go.temporal.io/server/common/clock" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log" + "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/quotas" ) @@ -90,11 +91,13 @@ func (s *interleavedWeightedRoundRobinSchedulerSuite) SetupTest() { s.scheduler = NewInterleavedWeightedRoundRobinScheduler( InterleavedWeightedRoundRobinSchedulerOptions[*testTask, int]{ - TaskChannelKeyFn: func(task *testTask) int { return task.channelKey }, - ChannelWeightFn: func(key int) int { return s.channelKeyToWeight[key] }, - ChannelWeightUpdateCh: s.channelWeightUpdateCh, - ChannelQuotaRequestFn: func(key int) quotas.Request { return quotas.NewRequest("", 1, "", "", "") }, - EnableRateLimiter: dynamicconfig.GetBoolPropertyFn(true), + TaskChannelKeyFn: func(task *testTask) int { return task.channelKey }, + ChannelWeightFn: func(key int) int { return s.channelKeyToWeight[key] }, + ChannelWeightUpdateCh: s.channelWeightUpdateCh, + ChannelQuotaRequestFn: func(key int) quotas.Request { return quotas.NewRequest("", 1, "", "", "") }, + TaskChannelMetricTagsFn: func(key int) []metrics.Tag { return nil }, + EnableRateLimiter: dynamicconfig.GetBoolPropertyFn(true), + EnableRateLimiterShadowMode: dynamicconfig.GetBoolPropertyFn(false), }, Scheduler[*testTask](s.mockFIFOScheduler), quotas.NewRequestRateLimiterAdapter( @@ -104,6 +107,7 @@ func (s *interleavedWeightedRoundRobinSchedulerSuite) SetupTest() { ), clock.NewRealTimeSource(), logger, + metrics.NoopMetricsHandler, ) } diff --git a/service/history/archival_queue_factory.go b/service/history/archival_queue_factory.go index 95caa5285dd..e6e2dc391e5 100644 --- a/service/history/archival_queue_factory.go +++ b/service/history/archival_queue_factory.go @@ -27,6 +27,7 @@ package history import ( "go.uber.org/fx" + "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" @@ -95,14 +96,14 @@ func newScheduler(params ArchivalQueueFactoryParams) queues.Scheduler { queues.PrioritySchedulerOptions{ WorkerCount: params.Config.ArchivalProcessorSchedulerWorkerCount, EnableRateLimiter: params.Config.TaskSchedulerEnableRateLimiter, - MaxDispatchThrottleDuration: HostSchedulerMaxDispatchThrottleDuration, - Weight: func() map[string]any { - return ArchivalTaskPriorities - }, + EnableRateLimiterShadowMode: params.Config.TaskSchedulerEnableRateLimiterShadowMode, + DispatchThrottleDuration: params.Config.TaskSchedulerThrottleDuration, + Weight: dynamicconfig.GetMapPropertyFn(ArchivalTaskPriorities), }, params.SchedulerRateLimiter, params.TimeSource, params.Logger, + params.MetricsHandler.WithTags(metrics.OperationTag(metrics.OperationArchivalQueueProcessorScope)), ) } @@ -140,7 +141,7 @@ func (f *archivalQueueFactory) newArchivalTaskExecutor(shard shard.Context, work workflowCache, f.RelocatableAttributesFetcher, f.MetricsHandler, - f.Logger, + log.With(shard.GetLogger(), tag.ComponentArchivalQueue), ) } diff --git a/service/history/archival_queue_factory_test.go b/service/history/archival_queue_factory_test.go index b46c2727936..c5754cf8508 100644 --- a/service/history/archival_queue_factory_test.go +++ b/service/history/archival_queue_factory_test.go @@ -49,7 +49,7 @@ func TestArchivalQueueFactory(t *testing.T) { assert.Equal(t, metrics.OperationTagName, tags[0].Key()) assert.Equal(t, "ArchivalQueueProcessor", tags[0].Value()) return metricsHandler - }) + }).Times(2) shardContext := shard.NewMockContext(ctrl) shardContext.EXPECT().GetLogger().Return(log.NewNoopLogger()) shardContext.EXPECT().GetQueueState(tasks.CategoryArchival).Return(&persistence.QueueState{ diff --git a/service/history/archival_queue_task_executor.go b/service/history/archival_queue_task_executor.go index 4857da98fce..e6bf08c62b7 100644 --- a/service/history/archival_queue_task_executor.go +++ b/service/history/archival_queue_task_executor.go @@ -88,6 +88,9 @@ func (e *archivalQueueTaskExecutor) Execute( tags = []metrics.Tag{ getNamespaceTagByID(e.shardContext.GetNamespaceRegistry(), task.GetNamespaceID()), metrics.TaskTypeTag(taskType), + // OperationTag is for consistency on tags with other executors, + // since those tags will be used to emit a common set of metrics. + metrics.OperationTag(taskType), } switch task := task.(type) { case *tasks.ArchiveExecutionTask: diff --git a/service/history/configs/config.go b/service/history/configs/config.go index ec9c14f276f..b0823fbf43e 100644 --- a/service/history/configs/config.go +++ b/service/history/configs/config.go @@ -91,9 +91,11 @@ type Config struct { QueuePendingTaskMaxCount dynamicconfig.IntPropertyFn QueueMaxReaderCount dynamicconfig.IntPropertyFn - TaskSchedulerEnableRateLimiter dynamicconfig.BoolPropertyFn - TaskSchedulerMaxQPS dynamicconfig.IntPropertyFn - TaskSchedulerNamespaceMaxQPS dynamicconfig.IntPropertyFnWithNamespaceFilter + TaskSchedulerEnableRateLimiter dynamicconfig.BoolPropertyFn + TaskSchedulerEnableRateLimiterShadowMode dynamicconfig.BoolPropertyFn + TaskSchedulerThrottleDuration dynamicconfig.DurationPropertyFn + TaskSchedulerMaxQPS dynamicconfig.IntPropertyFn + TaskSchedulerNamespaceMaxQPS dynamicconfig.IntPropertyFnWithNamespaceFilter // TimerQueueProcessor settings TimerTaskHighPriorityRPS dynamicconfig.IntPropertyFnWithNamespaceFilter @@ -340,9 +342,11 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis QueuePendingTaskMaxCount: dc.GetIntProperty(dynamicconfig.QueuePendingTaskMaxCount, 10000), QueueMaxReaderCount: dc.GetIntProperty(dynamicconfig.QueueMaxReaderCount, 2), - TaskSchedulerEnableRateLimiter: dc.GetBoolProperty(dynamicconfig.TaskSchedulerEnableRateLimiter, false), - TaskSchedulerMaxQPS: dc.GetIntProperty(dynamicconfig.TaskSchedulerMaxQPS, 0), - TaskSchedulerNamespaceMaxQPS: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.TaskSchedulerNamespaceMaxQPS, 0), + TaskSchedulerEnableRateLimiter: dc.GetBoolProperty(dynamicconfig.TaskSchedulerEnableRateLimiter, false), + TaskSchedulerEnableRateLimiterShadowMode: dc.GetBoolProperty(dynamicconfig.TaskSchedulerEnableRateLimiterShadowMode, true), + TaskSchedulerThrottleDuration: dc.GetDurationProperty(dynamicconfig.TaskSchedulerThrottleDuration, time.Second), + TaskSchedulerMaxQPS: dc.GetIntProperty(dynamicconfig.TaskSchedulerMaxQPS, 0), + TaskSchedulerNamespaceMaxQPS: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.TaskSchedulerNamespaceMaxQPS, 0), TimerTaskBatchSize: dc.GetIntProperty(dynamicconfig.TimerTaskBatchSize, 100), TimerProcessorSchedulerWorkerCount: dc.GetIntProperty(dynamicconfig.TimerProcessorSchedulerWorkerCount, 512), diff --git a/service/history/queueFactoryBase.go b/service/history/queueFactoryBase.go index 66cf29bc7a3..eff179f6a3e 100644 --- a/service/history/queueFactoryBase.go +++ b/service/history/queueFactoryBase.go @@ -26,7 +26,6 @@ package history import ( "context" - "time" "go.uber.org/fx" @@ -45,13 +44,6 @@ import ( wcache "go.temporal.io/server/service/history/workflow/cache" ) -const ( - QueueFactoryFxGroup = "queueFactory" - - HostSchedulerMaxDispatchThrottleDuration = 3 * time.Second - ShardSchedulerMaxDispatchThrottleDuration = 5 * time.Second -) - type ( QueueFactory interface { common.Daemon diff --git a/service/history/queues/queue_scheduled_test.go b/service/history/queues/queue_scheduled_test.go index f46dbe153cf..3b411f7d267 100644 --- a/service/history/queues/queue_scheduled_test.go +++ b/service/history/queues/queue_scheduled_test.go @@ -84,8 +84,9 @@ func (s *scheduledQueueSuite) SetupTest() { scheduler := NewPriorityScheduler( PrioritySchedulerOptions{ - WorkerCount: dynamicconfig.GetIntPropertyFn(10), - EnableRateLimiter: dynamicconfig.GetBoolPropertyFn(true), + WorkerCount: dynamicconfig.GetIntPropertyFn(10), + EnableRateLimiter: dynamicconfig.GetBoolPropertyFn(true), + EnableRateLimiterShadowMode: dynamicconfig.GetBoolPropertyFn(true), }, NewSchedulerRateLimiter( s.mockShard.GetConfig().TaskSchedulerNamespaceMaxQPS, @@ -95,6 +96,7 @@ func (s *scheduledQueueSuite) SetupTest() { ), s.mockShard.GetTimeSource(), log.NewTestLogger(), + metrics.NoopMetricsHandler, ) rescheduler := NewRescheduler( scheduler, diff --git a/service/history/queues/scheduler.go b/service/history/queues/scheduler.go index b0af917f734..06b0b0d8d63 100644 --- a/service/history/queues/scheduler.go +++ b/service/history/queues/scheduler.go @@ -27,8 +27,6 @@ package queues import ( - "time" - "go.temporal.io/server/common" "go.temporal.io/server/common/clock" "go.temporal.io/server/common/dynamicconfig" @@ -78,14 +76,16 @@ type ( ActiveNamespaceWeights dynamicconfig.MapPropertyFnWithNamespaceFilter StandbyNamespaceWeights dynamicconfig.MapPropertyFnWithNamespaceFilter EnableRateLimiter dynamicconfig.BoolPropertyFn - MaxDispatchThrottleDuration time.Duration + EnableRateLimiterShadowMode dynamicconfig.BoolPropertyFn + DispatchThrottleDuration dynamicconfig.DurationPropertyFn } PrioritySchedulerOptions struct { WorkerCount dynamicconfig.IntPropertyFn Weight dynamicconfig.MapPropertyFn EnableRateLimiter dynamicconfig.BoolPropertyFn - MaxDispatchThrottleDuration time.Duration + EnableRateLimiterShadowMode dynamicconfig.BoolPropertyFn + DispatchThrottleDuration dynamicconfig.DurationPropertyFn } schedulerImpl struct { @@ -145,6 +145,13 @@ func NewNamespacePriorityScheduler( } return quotas.NewRequest("", taskSchedulerToken, namespaceName.String(), tasks.PriorityName[key.Priority], "") } + taskChannelMetricsTagsFn := func(key TaskChannelKey) []metrics.Tag { + namespaceName, _ := namespaceRegistry.GetNamespaceName(namespace.ID(key.NamespaceID)) + return []metrics.Tag{ + metrics.NamespaceTag(string(namespaceName)), + metrics.TaskPriorityTag(key.Priority.String()), + } + } fifoSchedulerOptions := &tasks.FIFOSchedulerOptions{ QueueSize: prioritySchedulerProcessorQueueSize, WorkerCount: options.WorkerCount, @@ -157,8 +164,10 @@ func NewNamespacePriorityScheduler( ChannelWeightFn: channelWeightFn, ChannelWeightUpdateCh: channelWeightUpdateCh, ChannelQuotaRequestFn: channelQuotaRequestFn, + TaskChannelMetricTagsFn: taskChannelMetricsTagsFn, EnableRateLimiter: options.EnableRateLimiter, - MaxDispatchThrottleDuration: options.MaxDispatchThrottleDuration, + EnableRateLimiterShadowMode: options.EnableRateLimiterShadowMode, + DispatchThrottleDuration: options.DispatchThrottleDuration, }, tasks.Scheduler[Executable](tasks.NewFIFOScheduler[Executable]( newSchedulerMonitor( @@ -174,6 +183,7 @@ func NewNamespacePriorityScheduler( rateLimiter, timeSource, logger, + metricsHandler, ), namespaceRegistry: namespaceRegistry, taskChannelKeyFn: taskChannelKeyFn, @@ -189,6 +199,7 @@ func NewPriorityScheduler( rateLimiter SchedulerRateLimiter, timeSource clock.TimeSource, logger log.Logger, + metricsHandler metrics.Handler, ) Scheduler { taskChannelKeyFn := func(e Executable) TaskChannelKey { return TaskChannelKey{ @@ -209,6 +220,12 @@ func NewPriorityScheduler( channelQuotaRequestFn := func(key TaskChannelKey) quotas.Request { return quotas.NewRequest("", taskSchedulerToken, "", tasks.PriorityName[key.Priority], "") } + taskChannelMetricsTagsFn := func(key TaskChannelKey) []metrics.Tag { + return []metrics.Tag{ + metrics.NamespaceUnknownTag(), + metrics.TaskPriorityTag(key.Priority.String()), + } + } fifoSchedulerOptions := &tasks.FIFOSchedulerOptions{ QueueSize: prioritySchedulerProcessorQueueSize, WorkerCount: options.WorkerCount, @@ -221,8 +238,10 @@ func NewPriorityScheduler( ChannelWeightFn: channelWeightFn, ChannelWeightUpdateCh: nil, ChannelQuotaRequestFn: channelQuotaRequestFn, + TaskChannelMetricTagsFn: taskChannelMetricsTagsFn, EnableRateLimiter: options.EnableRateLimiter, - MaxDispatchThrottleDuration: options.MaxDispatchThrottleDuration, + EnableRateLimiterShadowMode: options.EnableRateLimiterShadowMode, + DispatchThrottleDuration: options.DispatchThrottleDuration, }, tasks.Scheduler[Executable](tasks.NewFIFOScheduler[Executable]( noopScheduleMonitor, @@ -232,6 +251,7 @@ func NewPriorityScheduler( rateLimiter, timeSource, logger, + metricsHandler, ), taskChannelKeyFn: taskChannelKeyFn, channelWeightFn: channelWeightFn, diff --git a/service/history/timerQueueFactory.go b/service/history/timerQueueFactory.go index d7225e452d7..d0556692a34 100644 --- a/service/history/timerQueueFactory.go +++ b/service/history/timerQueueFactory.go @@ -78,7 +78,8 @@ func NewTimerQueueFactory( ActiveNamespaceWeights: params.Config.TimerProcessorSchedulerActiveRoundRobinWeights, StandbyNamespaceWeights: params.Config.TimerProcessorSchedulerStandbyRoundRobinWeights, EnableRateLimiter: params.Config.TaskSchedulerEnableRateLimiter, - MaxDispatchThrottleDuration: HostSchedulerMaxDispatchThrottleDuration, + EnableRateLimiterShadowMode: params.Config.TaskSchedulerEnableRateLimiterShadowMode, + DispatchThrottleDuration: params.Config.TaskSchedulerThrottleDuration, }, params.NamespaceRegistry, params.SchedulerRateLimiter, diff --git a/service/history/transferQueueFactory.go b/service/history/transferQueueFactory.go index 5536bad8530..39cc984e2e2 100644 --- a/service/history/transferQueueFactory.go +++ b/service/history/transferQueueFactory.go @@ -80,7 +80,8 @@ func NewTransferQueueFactory( ActiveNamespaceWeights: params.Config.TransferProcessorSchedulerActiveRoundRobinWeights, StandbyNamespaceWeights: params.Config.TransferProcessorSchedulerStandbyRoundRobinWeights, EnableRateLimiter: params.Config.TaskSchedulerEnableRateLimiter, - MaxDispatchThrottleDuration: HostSchedulerMaxDispatchThrottleDuration, + EnableRateLimiterShadowMode: params.Config.TaskSchedulerEnableRateLimiterShadowMode, + DispatchThrottleDuration: params.Config.TaskSchedulerThrottleDuration, }, params.NamespaceRegistry, params.SchedulerRateLimiter, diff --git a/service/history/visibilityQueueFactory.go b/service/history/visibilityQueueFactory.go index fc41afd57c7..50fdece8f85 100644 --- a/service/history/visibilityQueueFactory.go +++ b/service/history/visibilityQueueFactory.go @@ -69,7 +69,8 @@ func NewVisibilityQueueFactory( ActiveNamespaceWeights: params.Config.VisibilityProcessorSchedulerActiveRoundRobinWeights, StandbyNamespaceWeights: params.Config.VisibilityProcessorSchedulerStandbyRoundRobinWeights, EnableRateLimiter: params.Config.TaskSchedulerEnableRateLimiter, - MaxDispatchThrottleDuration: HostSchedulerMaxDispatchThrottleDuration, + EnableRateLimiterShadowMode: params.Config.TaskSchedulerEnableRateLimiterShadowMode, + DispatchThrottleDuration: params.Config.TaskSchedulerThrottleDuration, }, params.NamespaceRegistry, params.SchedulerRateLimiter, From 087872e623ff5c038993cececda7a092f4db0535 Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Tue, 31 Jan 2023 18:50:36 -0800 Subject: [PATCH 2/4] fix tests --- service/history/archival_queue_factory_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/history/archival_queue_factory_test.go b/service/history/archival_queue_factory_test.go index c5754cf8508..750e4427188 100644 --- a/service/history/archival_queue_factory_test.go +++ b/service/history/archival_queue_factory_test.go @@ -51,7 +51,7 @@ func TestArchivalQueueFactory(t *testing.T) { return metricsHandler }).Times(2) shardContext := shard.NewMockContext(ctrl) - shardContext.EXPECT().GetLogger().Return(log.NewNoopLogger()) + shardContext.EXPECT().GetLogger().Return(log.NewNoopLogger()).Times(2) shardContext.EXPECT().GetQueueState(tasks.CategoryArchival).Return(&persistence.QueueState{ ReaderStates: nil, ExclusiveReaderHighWatermark: &persistence.TaskKey{ From d4d2d8a1448a4c1b9c450295ed39323261bded01 Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Fri, 3 Feb 2023 10:55:35 -0800 Subject: [PATCH 3/4] use reserve in try dispatch --- common/tasks/interleaved_weighted_round_robin.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/common/tasks/interleaved_weighted_round_robin.go b/common/tasks/interleaved_weighted_round_robin.go index 67b3f59cb16..10f8290dfe8 100644 --- a/common/tasks/interleaved_weighted_round_robin.go +++ b/common/tasks/interleaved_weighted_round_robin.go @@ -495,11 +495,16 @@ func (s *InterleavedWeightedRoundRobinScheduler[T, K]) tryDispatchTaskDirectly( task T, ) bool { rateLimiterConfig := s.getRateLimiterConfig() + now := s.timeSource.Now() + reservation := quotas.NoopReservation if rateLimiterConfig.enableThrottle || rateLimiterConfig.enableShadowMode { - if !s.rateLimiter.Allow( - s.timeSource.Now(), + reservation = s.rateLimiter.Reserve( + now, s.options.ChannelQuotaRequestFn(channelKey), - ) { + ) + + if reservation.DelayFrom(now) != 0 { + reservation.CancelAt(now) s.metricsHandler.Counter(metrics.TaskSchedulerThrottled.GetMetricName()).Record(1, s.options.TaskChannelMetricTagsFn(channelKey)...) if rateLimiterConfig.enableThrottle { @@ -515,6 +520,8 @@ func (s *InterleavedWeightedRoundRobinScheduler[T, K]) tryDispatchTaskDirectly( dispatched := s.fifoScheduler.TrySubmit(task) if dispatched { atomic.AddInt64(&s.numInflightTask, -1) + } else { + reservation.CancelAt(now) } return dispatched } From d921d1ae092137510d35307119088e68768a1682 Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Fri, 3 Feb 2023 16:21:14 -0800 Subject: [PATCH 4/4] rebase --- service/history/queueFactoryBase.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/service/history/queueFactoryBase.go b/service/history/queueFactoryBase.go index 7dedce7ca93..37acc1ef785 100644 --- a/service/history/queueFactoryBase.go +++ b/service/history/queueFactoryBase.go @@ -45,6 +45,8 @@ import ( wcache "go.temporal.io/server/service/history/workflow/cache" ) +const QueueFactoryFxGroup = "queueFactory" + type ( QueueFactory interface { common.Daemon