From 4577945394a584378252c9372f23614eb9cfd317 Mon Sep 17 00:00:00 2001 From: Casie Chen Date: Wed, 14 Aug 2024 15:19:30 -0700 Subject: [PATCH 1/5] Create flag to prioritize dequeuing from query components before tenant fairness --- pkg/frontend/v1/frontend.go | 1 + pkg/scheduler/queue/queue.go | 3 +- pkg/scheduler/queue/queue_test.go | 53 ++++++++++++++++------- pkg/scheduler/queue/tenant_queues.go | 29 ++++++++++--- pkg/scheduler/queue/tenant_queues_test.go | 50 +++++++++++++++------ pkg/scheduler/scheduler.go | 8 ++++ 6 files changed, 107 insertions(+), 37 deletions(-) diff --git a/pkg/frontend/v1/frontend.go b/pkg/frontend/v1/frontend.go index 9f1db99a0fd..e4247fb8361 100644 --- a/pkg/frontend/v1/frontend.go +++ b/pkg/frontend/v1/frontend.go @@ -128,6 +128,7 @@ func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Regist log, cfg.MaxOutstandingPerTenant, false, + false, cfg.QuerierForgetDelay, f.queueLength, f.discardedRequests, diff --git a/pkg/scheduler/queue/queue.go b/pkg/scheduler/queue/queue.go index 951f1f2ca65..b6c21f6c2d5 100644 --- a/pkg/scheduler/queue/queue.go +++ b/pkg/scheduler/queue/queue.go @@ -238,6 +238,7 @@ func NewRequestQueue( log log.Logger, maxOutstandingPerTenant int, useMultiAlgoQueue bool, + prioritizeQueryComponents bool, forgetDelay time.Duration, queueLength *prometheus.GaugeVec, discardedRequests *prometheus.CounterVec, @@ -273,7 +274,7 @@ func NewRequestQueue( waitingQuerierConnsToDispatch: list.New(), QueryComponentUtilization: queryComponentCapacity, - queueBroker: newQueueBroker(maxOutstandingPerTenant, useMultiAlgoQueue, forgetDelay), + queueBroker: newQueueBroker(maxOutstandingPerTenant, useMultiAlgoQueue, prioritizeQueryComponents, forgetDelay), } q.Service = services.NewBasicService(q.starting, q.running, q.stop).WithName("request queue") diff --git a/pkg/scheduler/queue/queue_test.go b/pkg/scheduler/queue/queue_test.go index 40bb573caae..876be95384b 100644 --- a/pkg/scheduler/queue/queue_test.go +++ b/pkg/scheduler/queue/queue_test.go @@ -28,17 +28,20 @@ import ( util_test "github.com/grafana/mimir/pkg/util/test" ) -// // TODO (casie): Write tests for prioritizeQueryComponents is true +// buildTreeTestsStruct returns all _allowed_ combinations of config flags for testing. func buildTreeTestsStruct() []struct { - name string - useMultiAlgoTreeQueue bool + name string + useMultiAlgoTreeQueue bool + prioritizeQueryComponents bool } { return []struct { - name string - useMultiAlgoTreeQueue bool + name string + useMultiAlgoTreeQueue bool + prioritizeQueryComponents bool }{ - {"legacy tree queue", false}, - {"integrated tree queue", true}, + {"legacy tree queue with prioritize query components disabled", false, false}, + {"integrated tree queue with prioritize query components disabled", true, false}, + {"integrated tree queue with prioritize query components enabled", true, true}, } } @@ -107,9 +110,8 @@ func makeSchedulerRequest(tenantID string, additionalQueueDimensions []string) * // round-robins between the multiple queues, which has the effect of alternately dequeuing from the slow queries // and normal queries rather than blocking normal queries behind slow queries. func TestMultiDimensionalQueueFairnessSlowConsumerEffects(t *testing.T) { - treeTypes := buildTreeTestsStruct() - - for _, tt := range treeTypes { + for _, tt := range buildTreeTestsStruct() { + // Only test allowed combinations of these configs t.Run(tt.name, func(t *testing.T) { promRegistry := prometheus.NewPedanticRegistry() @@ -153,6 +155,7 @@ func TestMultiDimensionalQueueFairnessSlowConsumerEffects(t *testing.T) { log.NewNopLogger(), maxOutstandingRequestsPerTenant, tt.useMultiAlgoTreeQueue, + tt.prioritizeQueryComponents, forgetQuerierDelay, promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), @@ -264,6 +267,7 @@ func BenchmarkConcurrentQueueOperations(b *testing.B) { log.NewNopLogger(), maxOutstandingRequestsPerTenant, t.useMultiAlgoTreeQueue, + t.prioritizeQueryComponents, forgetQuerierDelay, promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), @@ -450,6 +454,7 @@ func TestRequestQueue_RegisterAndUnregisterQuerierWorkerConnections(t *testing.T log.NewNopLogger(), 1, tt.useMultiAlgoTreeQueue, + tt.prioritizeQueryComponents, forgetDelay, promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), @@ -543,6 +548,7 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldGetRequestAfterReshardingBe log.NewNopLogger(), 1, tt.useMultiAlgoTreeQueue, + tt.prioritizeQueryComponents, forgetDelay, promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), @@ -622,6 +628,7 @@ func TestRequestQueue_GetNextRequestForQuerier_ReshardNotifiedCorrectlyForMultip log.NewNopLogger(), 1, tt.useMultiAlgoTreeQueue, + tt.prioritizeQueryComponents, forgetDelay, promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), @@ -717,6 +724,7 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldReturnAfterContextCancelled log.NewNopLogger(), 1, tt.useMultiAlgoTreeQueue, + tt.prioritizeQueryComponents, forgetDelay, promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), @@ -777,6 +785,7 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldReturnImmediatelyIfQuerierI log.NewNopLogger(), 1, tt.useMultiAlgoTreeQueue, + tt.prioritizeQueryComponents, forgetDelay, promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), @@ -814,6 +823,7 @@ func TestRequestQueue_tryDispatchRequestToQuerier_ShouldReEnqueueAfterFailedSend log.NewNopLogger(), 1, tt.useMultiAlgoTreeQueue, + tt.prioritizeQueryComponents, forgetDelay, promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), @@ -824,14 +834,15 @@ func TestRequestQueue_tryDispatchRequestToQuerier_ShouldReEnqueueAfterFailedSend // bypassing queue dispatcher loop for direct usage of the queueBroker and // passing a waitingQuerierConn for a canceled querier connection - queueBroker := newQueueBroker(queue.maxOutstandingPerTenant, false, queue.forgetDelay) + queueBroker := newQueueBroker(queue.maxOutstandingPerTenant, tt.useMultiAlgoTreeQueue, tt.prioritizeQueryComponents, queue.forgetDelay) queueBroker.addQuerierWorkerConn(NewUnregisteredQuerierWorkerConn(context.Background(), querierID)) tenantMaxQueriers := 0 // no sharding + queueDim := randAdditionalQueueDimension(true) req := &SchedulerRequest{ Ctx: context.Background(), Request: &httpgrpc.HTTPRequest{Method: "GET", Url: "/hello"}, - AdditionalQueueDimensions: randAdditionalQueueDimension(true), + AdditionalQueueDimensions: queueDim, } tr := tenantRequest{ tenantID: TenantID("tenant-1"), @@ -844,9 +855,20 @@ func TestRequestQueue_tryDispatchRequestToQuerier_ShouldReEnqueueAfterFailedSend require.NoError(t, queueBroker.enqueueRequestBack(&tr, tenantMaxQueriers)) require.False(t, tq.getNode(QueuePath{"tenant-1"}).IsEmpty()) } else if itq, ok := queueBroker.tree.(*MultiQueuingAlgorithmTreeQueue); ok { - require.Nil(t, itq.GetNode(QueuePath{"tenant-1"})) + var path QueuePath + if queueDim == nil { + queueDim = []string{unknownQueueDimension} + } + switch queueBroker.prioritizeQueryComponents { + case true: + path = append(append(path, queueDim...), "tenant-1") + default: + path = append([]string{"tenant-1"}, queueDim...) + } + require.Nil(t, itq.GetNode(path)) require.NoError(t, queueBroker.enqueueRequestBack(&tr, tenantMaxQueriers)) - require.False(t, itq.GetNode(QueuePath{"tenant-1"}).IsEmpty()) + require.False(t, itq.GetNode(path).IsEmpty()) + } ctx, cancel := context.WithCancel(context.Background()) @@ -866,7 +888,8 @@ func TestRequestQueue_tryDispatchRequestToQuerier_ShouldReEnqueueAfterFailedSend if tq, ok := queueBroker.tree.(*TreeQueue); ok { require.False(t, tq.getNode(QueuePath{"tenant-1"}).IsEmpty()) } else if itq, ok := queueBroker.tree.(*MultiQueuingAlgorithmTreeQueue); ok { - require.False(t, itq.GetNode(QueuePath{"tenant-1"}).IsEmpty()) + path := queueBroker.makeQueuePathForTests("tenant-1") + require.False(t, itq.GetNode(path).IsEmpty()) } }) diff --git a/pkg/scheduler/queue/tenant_queues.go b/pkg/scheduler/queue/tenant_queues.go index d446e4cf450..4994d10b7d1 100644 --- a/pkg/scheduler/queue/tenant_queues.go +++ b/pkg/scheduler/queue/tenant_queues.go @@ -36,20 +36,34 @@ type queueBroker struct { func newQueueBroker( maxTenantQueueSize int, useMultiAlgoTreeQueue bool, + prioritizeQueryComponents bool, forgetDelay time.Duration, ) *queueBroker { tqas := newTenantQuerierAssignments(forgetDelay) var tree Tree var err error if useMultiAlgoTreeQueue { - algos := []QueuingAlgorithm{ - tqas, // root; QueuingAlgorithm selects tenants - &roundRobinState{}, // tenant queues; QueuingAlgorithm selects query component - &roundRobinState{}, // query components; QueuingAlgorithm selects query from local queue + var algos []QueuingAlgorithm + if prioritizeQueryComponents { + algos = []QueuingAlgorithm{ + &roundRobinState{}, // root; QueuingAlgorithm selects query component + tqas, // query components; QueuingAlgorithm selects tenants + &roundRobinState{}, // tenant queues; QueuingAlgorithm selects from local queue + + } + } else { + algos = []QueuingAlgorithm{ + tqas, // root; QueuingAlgorithm selects tenants + &roundRobinState{}, // tenant queues; QueuingAlgorithm selects query component + &roundRobinState{}, // query components; QueuingAlgorithm selects query from local queue + } } tree, err = NewTree(algos...) } else { // by default, use the legacy tree queue + if prioritizeQueryComponents { + panic("cannot prioritize query components for legacy tree queue") + } tree = NewTreeQueue("root") } @@ -58,9 +72,10 @@ func newQueueBroker( panic(fmt.Sprintf("error creating the tree queue: %v", err)) } qb := &queueBroker{ - tree: tree, - tenantQuerierAssignments: tqas, - maxTenantQueueSize: maxTenantQueueSize, + tree: tree, + tenantQuerierAssignments: tqas, + maxTenantQueueSize: maxTenantQueueSize, + prioritizeQueryComponents: prioritizeQueryComponents, } return qb diff --git a/pkg/scheduler/queue/tenant_queues_test.go b/pkg/scheduler/queue/tenant_queues_test.go index faac03f4724..57ec7e44e66 100644 --- a/pkg/scheduler/queue/tenant_queues_test.go +++ b/pkg/scheduler/queue/tenant_queues_test.go @@ -82,7 +82,7 @@ func TestQueues_NoShuffleSharding(t *testing.T) { treeTypes := buildTreeTestsStruct() for _, tt := range treeTypes { t.Run(tt.name, func(t *testing.T) { - qb := newQueueBroker(0, tt.useMultiAlgoTreeQueue, 0) + qb := newQueueBroker(0, tt.useMultiAlgoTreeQueue, tt.prioritizeQueryComponents, 0) assert.NotNil(t, qb) assert.NoError(t, isConsistent(qb)) @@ -217,14 +217,12 @@ func TestQueues_NoShuffleSharding(t *testing.T) { } func TestQueuesRespectMaxTenantQueueSizeWithSubQueues(t *testing.T) { - // TODO (casie): When implementing prioritizeQueryComponents, add tests here, since we will be able to have - // multiple nodes for a single tenant treeTypes := buildTreeTestsStruct() for _, tt := range treeTypes { t.Run(tt.name, func(t *testing.T) { maxTenantQueueSize := 100 - qb := newQueueBroker(maxTenantQueueSize, tt.useMultiAlgoTreeQueue, 0) + qb := newQueueBroker(maxTenantQueueSize, tt.useMultiAlgoTreeQueue, tt.prioritizeQueryComponents, 0) additionalQueueDimensions := map[int][]string{ 0: nil, 1: {"ingester"}, @@ -254,17 +252,38 @@ func TestQueuesRespectMaxTenantQueueSizeWithSubQueues(t *testing.T) { if tq, ok := qb.tree.(*TreeQueue); ok { assert.Equal(t, maxTenantQueueSize, tq.getNode(queuePath).ItemCount()) } else if itq, ok := qb.tree.(*MultiQueuingAlgorithmTreeQueue); ok { - assert.Equal(t, maxTenantQueueSize, itq.GetNode(queuePath).ItemCount()) + var itemCount int + // if prioritizeQueryComponents, we need to build paths for each queue dimension + // and sum all items + if qb.prioritizeQueryComponents { + for _, addlQueueDim := range additionalQueueDimensions { + var path QueuePath + path = append(append(path, addlQueueDim...), "tenant-1") + if addlQueueDim == nil { + path = qb.makeQueuePathForTests("tenant-1") + } + itemCount += itq.GetNode(path).ItemCount() + } + assert.Equal(t, maxTenantQueueSize, itemCount) + + } else { + assert.Equal(t, maxTenantQueueSize, itq.GetNode(queuePath).ItemCount()) + } } - // assert equal distribution of queue items between tenant node and 3 subnodes + // assert equal distribution of queue items between 4 subnodes for _, v := range additionalQueueDimensions { var checkPath QueuePath if v == nil { - checkPath = append(QueuePath{"tenant-1"}, unknownQueueDimension) - } else { + v = []string{unknownQueueDimension} + } + switch qb.prioritizeQueryComponents { + case true: + checkPath = append(append(checkPath, v...), "tenant-1") + default: checkPath = append(QueuePath{"tenant-1"}, v...) } + // TODO (casie): After deprecating legacy tree queue, clean this up var itemCount int if tq, ok := qb.tree.(*TreeQueue); ok { @@ -308,7 +327,7 @@ func TestQueuesOnTerminatingQuerier(t *testing.T) { treeTypes := buildTreeTestsStruct() for _, tt := range treeTypes { t.Run(tt.name, func(t *testing.T) { - qb := newQueueBroker(0, tt.useMultiAlgoTreeQueue, 0) + qb := newQueueBroker(0, tt.useMultiAlgoTreeQueue, tt.prioritizeQueryComponents, 0) assert.NotNil(t, qb) assert.NoError(t, isConsistent(qb)) @@ -382,7 +401,7 @@ func TestQueues_QuerierDistribution(t *testing.T) { treeTypes := buildTreeTestsStruct() for _, tt := range treeTypes { t.Run(tt.name, func(t *testing.T) { - qb := newQueueBroker(0, tt.useMultiAlgoTreeQueue, 0) + qb := newQueueBroker(0, tt.useMultiAlgoTreeQueue, tt.prioritizeQueryComponents, 0) assert.NotNil(t, qb) assert.NoError(t, isConsistent(qb)) @@ -462,7 +481,7 @@ func TestQueuesConsistency(t *testing.T) { t.Run(tt.name, func(t *testing.T) { for testName, testData := range tests { t.Run(testName, func(t *testing.T) { - qb := newQueueBroker(0, tt.useMultiAlgoTreeQueue, testData.forgetDelay) + qb := newQueueBroker(0, tt.useMultiAlgoTreeQueue, tt.prioritizeQueryComponents, testData.forgetDelay) assert.NotNil(t, qb) assert.NoError(t, isConsistent(qb)) @@ -523,7 +542,7 @@ func TestQueues_ForgetDelay(t *testing.T) { for _, tt := range treeTypes { t.Run(tt.name, func(t *testing.T) { now := time.Now() - qb := newQueueBroker(0, tt.useMultiAlgoTreeQueue, forgetDelay) + qb := newQueueBroker(0, tt.useMultiAlgoTreeQueue, tt.prioritizeQueryComponents, forgetDelay) assert.NotNil(t, qb) assert.NoError(t, isConsistent(qb)) @@ -636,7 +655,7 @@ func TestQueues_ForgetDelay_ShouldCorrectlyHandleQuerierReconnectingBeforeForget for _, tt := range treeTypes { t.Run(tt.name, func(t *testing.T) { now := time.Now() - qb := newQueueBroker(0, tt.useMultiAlgoTreeQueue, forgetDelay) + qb := newQueueBroker(0, tt.useMultiAlgoTreeQueue, tt.prioritizeQueryComponents, forgetDelay) assert.NotNil(t, qb) assert.NoError(t, isConsistent(qb)) @@ -764,7 +783,7 @@ func (qb *queueBroker) getOrAddTenantQueue(tenantID TenantID, maxQueriers int) e // removeTenantQueue is a test utility, not intended for use by consumers of queueBroker func (qb *queueBroker) removeTenantQueue(tenantID TenantID) bool { qb.tenantQuerierAssignments.removeTenant(tenantID) - queuePath := QueuePath{string(tenantID)} + queuePath := qb.makeQueuePathForTests(tenantID) // TODO (casie): When deprecating legacy tree queue, clean this up if tq, ok := qb.tree.(*TreeQueue); ok { @@ -811,6 +830,9 @@ func (n *Node) deleteNode(pathFromNode QueuePath) bool { } func (qb *queueBroker) makeQueuePathForTests(tenantID TenantID) QueuePath { + if qb.prioritizeQueryComponents { + return QueuePath{unknownQueueDimension, string(tenantID)} + } return QueuePath{string(tenantID)} } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 92abb821203..53843eb8716 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -8,6 +8,7 @@ package scheduler import ( "context" "flag" + "fmt" "io" "net/http" "strings" @@ -96,6 +97,7 @@ type Config struct { MaxOutstandingPerTenant int `yaml:"max_outstanding_requests_per_tenant"` AdditionalQueryQueueDimensionsEnabled bool `yaml:"additional_query_queue_dimensions_enabled" category:"experimental"` UseMultiAlgorithmQueryQueue bool `yaml:"use_multi_algorithm_query_queue" category:"experimental"` + PrioritizeQueryComponents bool `yaml:"prioritize_query_components" category:"experimental"` QuerierForgetDelay time.Duration `yaml:"querier_forget_delay" category:"experimental"` GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=This configures the gRPC client used to report errors back to the query-frontend."` @@ -106,6 +108,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) { f.IntVar(&cfg.MaxOutstandingPerTenant, "query-scheduler.max-outstanding-requests-per-tenant", 100, "Maximum number of outstanding requests per tenant per query-scheduler. In-flight requests above this limit will fail with HTTP response status code 429.") f.BoolVar(&cfg.AdditionalQueryQueueDimensionsEnabled, "query-scheduler.additional-query-queue-dimensions-enabled", false, "Non-operational: Enqueue query requests with additional queue dimensions to split tenant request queues into subqueues. This enables separate requests to proceed from a tenant's subqueues even when other subqueues are blocked on slow query requests. Must be set on both query-frontend and scheduler to take effect. (default false)") f.BoolVar(&cfg.UseMultiAlgorithmQueryQueue, "query-scheduler.use-multi-algorithm-query-queue", false, "Use an experimental version of the query queue which has the same behavior as the existing queue, but integrates tenant selection into the tree model.") + f.BoolVar(&cfg.PrioritizeQueryComponents, "query-scheduler.prioritize-query-components", false, "When enabled, the query scheduler will primarily prioritize dequeuing fairly from queue components, and secondarily prioritize dequeuing fairly across tenants. When disabled, tenant fairness is primarily prioritized. `query-scheduler.use-multi-algorithm-query-queue` must be enabled in order to use this flag.") f.DurationVar(&cfg.QuerierForgetDelay, "query-scheduler.querier-forget-delay", 0, "If a querier disconnects without sending notification about graceful shutdown, the query-scheduler will keep the querier in the tenant's shard until the forget delay has passed. This feature is useful to reduce the blast radius when shuffle-sharding is enabled.") cfg.GRPCClientConfig.RegisterFlagsWithPrefix("query-scheduler.grpc-client-config", f) @@ -113,6 +116,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) { } func (cfg *Config) Validate() error { + if cfg.PrioritizeQueryComponents && !cfg.UseMultiAlgorithmQueryQueue { + return fmt.Errorf("cannot enable query-scheduler.prioritize-query-components without query-scheduler.use-multi-algorithm-query-queue") + } + return cfg.ServiceDiscovery.Validate() } @@ -162,6 +169,7 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe s.log, cfg.MaxOutstandingPerTenant, cfg.UseMultiAlgorithmQueryQueue, + cfg.PrioritizeQueryComponents, cfg.QuerierForgetDelay, s.queueLength, s.discardedRequests, From 06708d99ee554729defc81bb8a3cdcf916253b8d Mon Sep 17 00:00:00 2001 From: Casie Chen Date: Thu, 15 Aug 2024 17:00:48 -0700 Subject: [PATCH 2/5] Update docs and CHANGELOG --- CHANGELOG.md | 1 + cmd/mimir/config-descriptor.json | 11 +++++++++++ cmd/mimir/help-all.txt.tmpl | 2 ++ .../mimir/configure/configuration-parameters/index.md | 8 ++++++++ 4 files changed, 22 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 269f4dd4bd2..629de86c60c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -67,6 +67,7 @@ * [ENHANCEMENT] Expose a new `s3.session-token` configuration option to enable using temporary security credentials. #8952 * [ENHANCEMENT] Make `-query-frontend.additional-query-queue-dimensions-enabled` and `-query-scheduler.additional-query-queue-dimensions-enabled` non-operational flags in preparation for removal. #8984 * [ENHANCEMENT] Add a new ingester endpoint to prepare instances to downscale. #8956 +* [ENHANCEMENT] Query-scheduler: Add `query-scheduler.prioritize-query-components` to allow dequeuing from specific query components. #9016 * [BUGFIX] Ruler: add support for draining any outstanding alert notifications before shutting down. This can be enabled with the `-ruler.drain-notification-queue-on-shutdown=true` CLI flag. #8346 * [BUGFIX] Query-frontend: fix `-querier.max-query-lookback` enforcement when `-compactor.blocks-retention-period` is not set, and viceversa. #8388 * [BUGFIX] Ingester: fix sporadic `not found` error causing an internal server error if label names are queried with matchers during head compaction. #8391 diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index acd0672eb17..7b5a04c7b93 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -16033,6 +16033,17 @@ "fieldType": "boolean", "fieldCategory": "experimental" }, + { + "kind": "field", + "name": "prioritize_query_components", + "required": false, + "desc": "When enabled, the query scheduler will primarily prioritize dequeuing fairly from queue components, and secondarily prioritize dequeuing fairly across tenants. When disabled, tenant fairness is primarily prioritized. `query-scheduler.use-multi-algorithm-query-queue` must be enabled in order to use this flag.", + "fieldValue": null, + "fieldDefaultValue": false, + "fieldFlag": "query-scheduler.prioritize-query-components", + "fieldType": "boolean", + "fieldCategory": "experimental" + }, { "kind": "field", "name": "querier_forget_delay", diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 88cc200b5f8..d1ad53a57f0 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -2255,6 +2255,8 @@ Usage of ./cmd/mimir/mimir: Maximum number of outstanding requests per tenant per query-scheduler. In-flight requests above this limit will fail with HTTP response status code 429. (default 100) -query-scheduler.max-used-instances int The maximum number of query-scheduler instances to use, regardless how many replicas are running. This option can be set only when -query-scheduler.service-discovery-mode is set to 'ring'. 0 to use all available query-scheduler instances. + -query-scheduler.prioritize-query-components + [experimental] When enabled, the query scheduler will primarily prioritize dequeuing fairly from queue components, and secondarily prioritize dequeuing fairly across tenants. When disabled, tenant fairness is primarily prioritized. `query-scheduler.use-multi-algorithm-query-queue` must be enabled in order to use this flag. -query-scheduler.querier-forget-delay duration [experimental] If a querier disconnects without sending notification about graceful shutdown, the query-scheduler will keep the querier in the tenant's shard until the forget delay has passed. This feature is useful to reduce the blast radius when shuffle-sharding is enabled. -query-scheduler.ring.consul.acl-token string diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index 88ea0318deb..b2a280d3b73 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -1722,6 +1722,14 @@ The `query_scheduler` block configures the query-scheduler. # CLI flag: -query-scheduler.use-multi-algorithm-query-queue [use_multi_algorithm_query_queue: | default = false] +# (experimental) When enabled, the query scheduler will primarily prioritize +# dequeuing fairly from queue components, and secondarily prioritize dequeuing +# fairly across tenants. When disabled, tenant fairness is primarily +# prioritized. `query-scheduler.use-multi-algorithm-query-queue` must be enabled +# in order to use this flag. +# CLI flag: -query-scheduler.prioritize-query-components +[prioritize_query_components: | default = false] + # (experimental) If a querier disconnects without sending notification about # graceful shutdown, the query-scheduler will keep the querier in the tenant's # shard until the forget delay has passed. This feature is useful to reduce the From 146e4f4221351f0b6d0c37b87c581d8eb22d1f7c Mon Sep 17 00:00:00 2001 From: Casie Chen Date: Fri, 16 Aug 2024 15:39:12 -0700 Subject: [PATCH 3/5] Address Franco's feedback --- CHANGELOG.md | 2 +- pkg/scheduler/queue/queue_test.go | 5 ++--- pkg/scheduler/queue/tenant_queues_test.go | 5 ++--- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 629de86c60c..26c72f688de 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -67,7 +67,7 @@ * [ENHANCEMENT] Expose a new `s3.session-token` configuration option to enable using temporary security credentials. #8952 * [ENHANCEMENT] Make `-query-frontend.additional-query-queue-dimensions-enabled` and `-query-scheduler.additional-query-queue-dimensions-enabled` non-operational flags in preparation for removal. #8984 * [ENHANCEMENT] Add a new ingester endpoint to prepare instances to downscale. #8956 -* [ENHANCEMENT] Query-scheduler: Add `query-scheduler.prioritize-query-components` to allow dequeuing from specific query components. #9016 +* [ENHANCEMENT] Query-scheduler: Add `query-scheduler.prioritize-query-components` which, when enabled, will primarily prioritize dequeuing fairly across queue components, and secondarily prioritize dequeuing fairly across tenants. When disabled, tenant fairness is primarily prioritized. `query-scheduler.use-multi-algorithm-query-queue` must be enabled in order to use this flag. #9016 * [BUGFIX] Ruler: add support for draining any outstanding alert notifications before shutting down. This can be enabled with the `-ruler.drain-notification-queue-on-shutdown=true` CLI flag. #8346 * [BUGFIX] Query-frontend: fix `-querier.max-query-lookback` enforcement when `-compactor.blocks-retention-period` is not set, and viceversa. #8388 * [BUGFIX] Ingester: fix sporadic `not found` error causing an internal server error if label names are queried with matchers during head compaction. #8391 diff --git a/pkg/scheduler/queue/queue_test.go b/pkg/scheduler/queue/queue_test.go index 876be95384b..afd226b2811 100644 --- a/pkg/scheduler/queue/queue_test.go +++ b/pkg/scheduler/queue/queue_test.go @@ -859,10 +859,9 @@ func TestRequestQueue_tryDispatchRequestToQuerier_ShouldReEnqueueAfterFailedSend if queueDim == nil { queueDim = []string{unknownQueueDimension} } - switch queueBroker.prioritizeQueryComponents { - case true: + if queueBroker.prioritizeQueryComponents { path = append(append(path, queueDim...), "tenant-1") - default: + } else { path = append([]string{"tenant-1"}, queueDim...) } require.Nil(t, itq.GetNode(path)) diff --git a/pkg/scheduler/queue/tenant_queues_test.go b/pkg/scheduler/queue/tenant_queues_test.go index 57ec7e44e66..4efd6ff5a53 100644 --- a/pkg/scheduler/queue/tenant_queues_test.go +++ b/pkg/scheduler/queue/tenant_queues_test.go @@ -277,10 +277,9 @@ func TestQueuesRespectMaxTenantQueueSizeWithSubQueues(t *testing.T) { if v == nil { v = []string{unknownQueueDimension} } - switch qb.prioritizeQueryComponents { - case true: + if qb.prioritizeQueryComponents { checkPath = append(append(checkPath, v...), "tenant-1") - default: + } else { checkPath = append(QueuePath{"tenant-1"}, v...) } From b12b7cc158d6bd1466624804f0b3e9aaba8073c7 Mon Sep 17 00:00:00 2001 From: Casie Chen Date: Fri, 16 Aug 2024 16:42:53 -0700 Subject: [PATCH 4/5] Update docs --- cmd/mimir/config-descriptor.json | 2 +- cmd/mimir/help-all.txt.tmpl | 2 +- .../mimir/configure/configuration-parameters/index.md | 10 +++++----- pkg/scheduler/scheduler.go | 2 +- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index 7b5a04c7b93..0722493ed22 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -16037,7 +16037,7 @@ "kind": "field", "name": "prioritize_query_components", "required": false, - "desc": "When enabled, the query scheduler will primarily prioritize dequeuing fairly from queue components, and secondarily prioritize dequeuing fairly across tenants. When disabled, tenant fairness is primarily prioritized. `query-scheduler.use-multi-algorithm-query-queue` must be enabled in order to use this flag.", + "desc": "When enabled, the query scheduler primarily prioritizes dequeuing fairly from queue components and secondarily prioritizes dequeuing fairly across tenants. When disabled, the query scheduler primarily prioritizes tenant fairness. You must enable the `query-scheduler.use-multi-algorithm-query-queue` setting to use this flag.", "fieldValue": null, "fieldDefaultValue": false, "fieldFlag": "query-scheduler.prioritize-query-components", diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index d1ad53a57f0..1b38fd204f7 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -2256,7 +2256,7 @@ Usage of ./cmd/mimir/mimir: -query-scheduler.max-used-instances int The maximum number of query-scheduler instances to use, regardless how many replicas are running. This option can be set only when -query-scheduler.service-discovery-mode is set to 'ring'. 0 to use all available query-scheduler instances. -query-scheduler.prioritize-query-components - [experimental] When enabled, the query scheduler will primarily prioritize dequeuing fairly from queue components, and secondarily prioritize dequeuing fairly across tenants. When disabled, tenant fairness is primarily prioritized. `query-scheduler.use-multi-algorithm-query-queue` must be enabled in order to use this flag. + [experimental] When enabled, the query scheduler primarily prioritizes dequeuing fairly from queue components and secondarily prioritizes dequeuing fairly across tenants. When disabled, the query scheduler primarily prioritizes tenant fairness. You must enable the `query-scheduler.use-multi-algorithm-query-queue` setting to use this flag. -query-scheduler.querier-forget-delay duration [experimental] If a querier disconnects without sending notification about graceful shutdown, the query-scheduler will keep the querier in the tenant's shard until the forget delay has passed. This feature is useful to reduce the blast radius when shuffle-sharding is enabled. -query-scheduler.ring.consul.acl-token string diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index b2a280d3b73..0d806b6c33f 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -1722,11 +1722,11 @@ The `query_scheduler` block configures the query-scheduler. # CLI flag: -query-scheduler.use-multi-algorithm-query-queue [use_multi_algorithm_query_queue: | default = false] -# (experimental) When enabled, the query scheduler will primarily prioritize -# dequeuing fairly from queue components, and secondarily prioritize dequeuing -# fairly across tenants. When disabled, tenant fairness is primarily -# prioritized. `query-scheduler.use-multi-algorithm-query-queue` must be enabled -# in order to use this flag. +# (experimental) When enabled, the query scheduler primarily prioritizes +# dequeuing fairly from queue components and secondarily prioritizes dequeuing +# fairly across tenants. When disabled, the query scheduler primarily +# prioritizes tenant fairness. You must enable the +# `query-scheduler.use-multi-algorithm-query-queue` setting to use this flag. # CLI flag: -query-scheduler.prioritize-query-components [prioritize_query_components: | default = false] diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 53843eb8716..fc08abb82b7 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -108,7 +108,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) { f.IntVar(&cfg.MaxOutstandingPerTenant, "query-scheduler.max-outstanding-requests-per-tenant", 100, "Maximum number of outstanding requests per tenant per query-scheduler. In-flight requests above this limit will fail with HTTP response status code 429.") f.BoolVar(&cfg.AdditionalQueryQueueDimensionsEnabled, "query-scheduler.additional-query-queue-dimensions-enabled", false, "Non-operational: Enqueue query requests with additional queue dimensions to split tenant request queues into subqueues. This enables separate requests to proceed from a tenant's subqueues even when other subqueues are blocked on slow query requests. Must be set on both query-frontend and scheduler to take effect. (default false)") f.BoolVar(&cfg.UseMultiAlgorithmQueryQueue, "query-scheduler.use-multi-algorithm-query-queue", false, "Use an experimental version of the query queue which has the same behavior as the existing queue, but integrates tenant selection into the tree model.") - f.BoolVar(&cfg.PrioritizeQueryComponents, "query-scheduler.prioritize-query-components", false, "When enabled, the query scheduler will primarily prioritize dequeuing fairly from queue components, and secondarily prioritize dequeuing fairly across tenants. When disabled, tenant fairness is primarily prioritized. `query-scheduler.use-multi-algorithm-query-queue` must be enabled in order to use this flag.") + f.BoolVar(&cfg.PrioritizeQueryComponents, "query-scheduler.prioritize-query-components", false, "When enabled, the query scheduler primarily prioritizes dequeuing fairly from queue components and secondarily prioritizes dequeuing fairly across tenants. When disabled, the query scheduler primarily prioritizes tenant fairness. You must enable the `query-scheduler.use-multi-algorithm-query-queue` setting to use this flag.") f.DurationVar(&cfg.QuerierForgetDelay, "query-scheduler.querier-forget-delay", 0, "If a querier disconnects without sending notification about graceful shutdown, the query-scheduler will keep the querier in the tenant's shard until the forget delay has passed. This feature is useful to reduce the blast radius when shuffle-sharding is enabled.") cfg.GRPCClientConfig.RegisterFlagsWithPrefix("query-scheduler.grpc-client-config", f) From 315c2e8f7ab7397a939fc91abd729b193a99d49c Mon Sep 17 00:00:00 2001 From: Casie Chen Date: Fri, 16 Aug 2024 17:25:50 -0700 Subject: [PATCH 5/5] Fix test --- pkg/scheduler/queue/queue_test.go | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/pkg/scheduler/queue/queue_test.go b/pkg/scheduler/queue/queue_test.go index afd226b2811..a573ea6406e 100644 --- a/pkg/scheduler/queue/queue_test.go +++ b/pkg/scheduler/queue/queue_test.go @@ -849,24 +849,25 @@ func TestRequestQueue_tryDispatchRequestToQuerier_ShouldReEnqueueAfterFailedSend req: req, } + var multiAlgorithmTreeQueuePath QueuePath + if queueDim == nil { + queueDim = []string{unknownQueueDimension} + } + if queueBroker.prioritizeQueryComponents { + multiAlgorithmTreeQueuePath = append(append(multiAlgorithmTreeQueuePath, queueDim...), "tenant-1") + } else { + multiAlgorithmTreeQueuePath = append([]string{"tenant-1"}, queueDim...) + } + // TODO (casie): Clean this up when deprecating legacy tree queue if tq, ok := queueBroker.tree.(*TreeQueue); ok { require.Nil(t, tq.getNode(QueuePath{"tenant-1"})) require.NoError(t, queueBroker.enqueueRequestBack(&tr, tenantMaxQueriers)) require.False(t, tq.getNode(QueuePath{"tenant-1"}).IsEmpty()) } else if itq, ok := queueBroker.tree.(*MultiQueuingAlgorithmTreeQueue); ok { - var path QueuePath - if queueDim == nil { - queueDim = []string{unknownQueueDimension} - } - if queueBroker.prioritizeQueryComponents { - path = append(append(path, queueDim...), "tenant-1") - } else { - path = append([]string{"tenant-1"}, queueDim...) - } - require.Nil(t, itq.GetNode(path)) + require.Nil(t, itq.GetNode(multiAlgorithmTreeQueuePath)) require.NoError(t, queueBroker.enqueueRequestBack(&tr, tenantMaxQueriers)) - require.False(t, itq.GetNode(path).IsEmpty()) + require.False(t, itq.GetNode(multiAlgorithmTreeQueuePath).IsEmpty()) } @@ -887,8 +888,7 @@ func TestRequestQueue_tryDispatchRequestToQuerier_ShouldReEnqueueAfterFailedSend if tq, ok := queueBroker.tree.(*TreeQueue); ok { require.False(t, tq.getNode(QueuePath{"tenant-1"}).IsEmpty()) } else if itq, ok := queueBroker.tree.(*MultiQueuingAlgorithmTreeQueue); ok { - path := queueBroker.makeQueuePathForTests("tenant-1") - require.False(t, itq.GetNode(path).IsEmpty()) + require.False(t, itq.GetNode(multiAlgorithmTreeQueuePath).IsEmpty()) } })