Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create flag to prioritize dequeuing from query components over tenant fairness #9016

Merged
merged 5 commits into from
Aug 17, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
* [ENHANCEMENT] Expose a new `s3.session-token` configuration option to enable using temporary security credentials. #8952
* [ENHANCEMENT] Make `-query-frontend.additional-query-queue-dimensions-enabled` and `-query-scheduler.additional-query-queue-dimensions-enabled` non-operational flags in preparation for removal. #8984
* [ENHANCEMENT] Add a new ingester endpoint to prepare instances to downscale. #8956
* [ENHANCEMENT] Query-scheduler: Add `query-scheduler.prioritize-query-components` to allow dequeuing from specific query components. #9016
francoposa marked this conversation as resolved.
Show resolved Hide resolved
* [BUGFIX] Ruler: add support for draining any outstanding alert notifications before shutting down. This can be enabled with the `-ruler.drain-notification-queue-on-shutdown=true` CLI flag. #8346
* [BUGFIX] Query-frontend: fix `-querier.max-query-lookback` enforcement when `-compactor.blocks-retention-period` is not set, and viceversa. #8388
* [BUGFIX] Ingester: fix sporadic `not found` error causing an internal server error if label names are queried with matchers during head compaction. #8391
Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -16033,6 +16033,17 @@
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "prioritize_query_components",
"required": false,
"desc": "When enabled, the query scheduler will primarily prioritize dequeuing fairly from queue components, and secondarily prioritize dequeuing fairly across tenants. When disabled, tenant fairness is primarily prioritized. `query-scheduler.use-multi-algorithm-query-queue` must be enabled in order to use this flag.",
"fieldValue": null,
"fieldDefaultValue": false,
"fieldFlag": "query-scheduler.prioritize-query-components",
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "querier_forget_delay",
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -2255,6 +2255,8 @@ Usage of ./cmd/mimir/mimir:
Maximum number of outstanding requests per tenant per query-scheduler. In-flight requests above this limit will fail with HTTP response status code 429. (default 100)
-query-scheduler.max-used-instances int
The maximum number of query-scheduler instances to use, regardless how many replicas are running. This option can be set only when -query-scheduler.service-discovery-mode is set to 'ring'. 0 to use all available query-scheduler instances.
-query-scheduler.prioritize-query-components
[experimental] When enabled, the query scheduler will primarily prioritize dequeuing fairly from queue components, and secondarily prioritize dequeuing fairly across tenants. When disabled, tenant fairness is primarily prioritized. `query-scheduler.use-multi-algorithm-query-queue` must be enabled in order to use this flag.
-query-scheduler.querier-forget-delay duration
[experimental] If a querier disconnects without sending notification about graceful shutdown, the query-scheduler will keep the querier in the tenant's shard until the forget delay has passed. This feature is useful to reduce the blast radius when shuffle-sharding is enabled.
-query-scheduler.ring.consul.acl-token string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1722,6 +1722,14 @@ The `query_scheduler` block configures the query-scheduler.
# CLI flag: -query-scheduler.use-multi-algorithm-query-queue
[use_multi_algorithm_query_queue: <boolean> | default = false]

# (experimental) When enabled, the query scheduler will primarily prioritize
# dequeuing fairly from queue components, and secondarily prioritize dequeuing
# fairly across tenants. When disabled, tenant fairness is primarily
# prioritized. `query-scheduler.use-multi-algorithm-query-queue` must be enabled
# in order to use this flag.
chencs marked this conversation as resolved.
Show resolved Hide resolved
# CLI flag: -query-scheduler.prioritize-query-components
[prioritize_query_components: <boolean> | default = false]

# (experimental) If a querier disconnects without sending notification about
# graceful shutdown, the query-scheduler will keep the querier in the tenant's
# shard until the forget delay has passed. This feature is useful to reduce the
Expand Down
1 change: 1 addition & 0 deletions pkg/frontend/v1/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Regist
log,
cfg.MaxOutstandingPerTenant,
false,
false,
cfg.QuerierForgetDelay,
f.queueLength,
f.discardedRequests,
Expand Down
3 changes: 2 additions & 1 deletion pkg/scheduler/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ func NewRequestQueue(
log log.Logger,
maxOutstandingPerTenant int,
useMultiAlgoQueue bool,
prioritizeQueryComponents bool,
forgetDelay time.Duration,
queueLength *prometheus.GaugeVec,
discardedRequests *prometheus.CounterVec,
Expand Down Expand Up @@ -273,7 +274,7 @@ func NewRequestQueue(
waitingQuerierConnsToDispatch: list.New(),

QueryComponentUtilization: queryComponentCapacity,
queueBroker: newQueueBroker(maxOutstandingPerTenant, useMultiAlgoQueue, forgetDelay),
queueBroker: newQueueBroker(maxOutstandingPerTenant, useMultiAlgoQueue, prioritizeQueryComponents, forgetDelay),
}

q.Service = services.NewBasicService(q.starting, q.running, q.stop).WithName("request queue")
Expand Down
53 changes: 38 additions & 15 deletions pkg/scheduler/queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,20 @@ import (
util_test "github.com/grafana/mimir/pkg/util/test"
)

// // TODO (casie): Write tests for prioritizeQueryComponents is true
// buildTreeTestsStruct returns all _allowed_ combinations of config flags for testing.
func buildTreeTestsStruct() []struct {
name string
useMultiAlgoTreeQueue bool
name string
useMultiAlgoTreeQueue bool
prioritizeQueryComponents bool
} {
return []struct {
name string
useMultiAlgoTreeQueue bool
name string
useMultiAlgoTreeQueue bool
prioritizeQueryComponents bool
}{
{"legacy tree queue", false},
{"integrated tree queue", true},
{"legacy tree queue with prioritize query components disabled", false, false},
{"integrated tree queue with prioritize query components disabled", true, false},
{"integrated tree queue with prioritize query components enabled", true, true},
}
}

Expand Down Expand Up @@ -107,9 +110,8 @@ func makeSchedulerRequest(tenantID string, additionalQueueDimensions []string) *
// round-robins between the multiple queues, which has the effect of alternately dequeuing from the slow queries
// and normal queries rather than blocking normal queries behind slow queries.
func TestMultiDimensionalQueueFairnessSlowConsumerEffects(t *testing.T) {
treeTypes := buildTreeTestsStruct()

for _, tt := range treeTypes {
for _, tt := range buildTreeTestsStruct() {
// Only test allowed combinations of these configs
t.Run(tt.name, func(t *testing.T) {
promRegistry := prometheus.NewPedanticRegistry()

Expand Down Expand Up @@ -153,6 +155,7 @@ func TestMultiDimensionalQueueFairnessSlowConsumerEffects(t *testing.T) {
log.NewNopLogger(),
maxOutstandingRequestsPerTenant,
tt.useMultiAlgoTreeQueue,
tt.prioritizeQueryComponents,
forgetQuerierDelay,
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
Expand Down Expand Up @@ -264,6 +267,7 @@ func BenchmarkConcurrentQueueOperations(b *testing.B) {
log.NewNopLogger(),
maxOutstandingRequestsPerTenant,
t.useMultiAlgoTreeQueue,
t.prioritizeQueryComponents,
forgetQuerierDelay,
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
Expand Down Expand Up @@ -450,6 +454,7 @@ func TestRequestQueue_RegisterAndUnregisterQuerierWorkerConnections(t *testing.T
log.NewNopLogger(),
1,
tt.useMultiAlgoTreeQueue,
tt.prioritizeQueryComponents,
forgetDelay,
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
Expand Down Expand Up @@ -543,6 +548,7 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldGetRequestAfterReshardingBe
log.NewNopLogger(),
1,
tt.useMultiAlgoTreeQueue,
tt.prioritizeQueryComponents,
forgetDelay,
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
Expand Down Expand Up @@ -622,6 +628,7 @@ func TestRequestQueue_GetNextRequestForQuerier_ReshardNotifiedCorrectlyForMultip
log.NewNopLogger(),
1,
tt.useMultiAlgoTreeQueue,
tt.prioritizeQueryComponents,
forgetDelay,
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
Expand Down Expand Up @@ -717,6 +724,7 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldReturnAfterContextCancelled
log.NewNopLogger(),
1,
tt.useMultiAlgoTreeQueue,
tt.prioritizeQueryComponents,
forgetDelay,
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
Expand Down Expand Up @@ -777,6 +785,7 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldReturnImmediatelyIfQuerierI
log.NewNopLogger(),
1,
tt.useMultiAlgoTreeQueue,
tt.prioritizeQueryComponents,
forgetDelay,
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
Expand Down Expand Up @@ -814,6 +823,7 @@ func TestRequestQueue_tryDispatchRequestToQuerier_ShouldReEnqueueAfterFailedSend
log.NewNopLogger(),
1,
tt.useMultiAlgoTreeQueue,
tt.prioritizeQueryComponents,
forgetDelay,
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
Expand All @@ -824,14 +834,15 @@ func TestRequestQueue_tryDispatchRequestToQuerier_ShouldReEnqueueAfterFailedSend

// bypassing queue dispatcher loop for direct usage of the queueBroker and
// passing a waitingQuerierConn for a canceled querier connection
queueBroker := newQueueBroker(queue.maxOutstandingPerTenant, false, queue.forgetDelay)
queueBroker := newQueueBroker(queue.maxOutstandingPerTenant, tt.useMultiAlgoTreeQueue, tt.prioritizeQueryComponents, queue.forgetDelay)
queueBroker.addQuerierWorkerConn(NewUnregisteredQuerierWorkerConn(context.Background(), querierID))

tenantMaxQueriers := 0 // no sharding
queueDim := randAdditionalQueueDimension(true)
req := &SchedulerRequest{
Ctx: context.Background(),
Request: &httpgrpc.HTTPRequest{Method: "GET", Url: "/hello"},
AdditionalQueueDimensions: randAdditionalQueueDimension(true),
AdditionalQueueDimensions: queueDim,
}
tr := tenantRequest{
tenantID: TenantID("tenant-1"),
Expand All @@ -844,9 +855,20 @@ func TestRequestQueue_tryDispatchRequestToQuerier_ShouldReEnqueueAfterFailedSend
require.NoError(t, queueBroker.enqueueRequestBack(&tr, tenantMaxQueriers))
require.False(t, tq.getNode(QueuePath{"tenant-1"}).IsEmpty())
} else if itq, ok := queueBroker.tree.(*MultiQueuingAlgorithmTreeQueue); ok {
require.Nil(t, itq.GetNode(QueuePath{"tenant-1"}))
var path QueuePath
if queueDim == nil {
queueDim = []string{unknownQueueDimension}
}
switch queueBroker.prioritizeQueryComponents {
case true:
francoposa marked this conversation as resolved.
Show resolved Hide resolved
path = append(append(path, queueDim...), "tenant-1")
default:
path = append([]string{"tenant-1"}, queueDim...)
}
require.Nil(t, itq.GetNode(path))
require.NoError(t, queueBroker.enqueueRequestBack(&tr, tenantMaxQueriers))
require.False(t, itq.GetNode(QueuePath{"tenant-1"}).IsEmpty())
require.False(t, itq.GetNode(path).IsEmpty())

}

ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -866,7 +888,8 @@ func TestRequestQueue_tryDispatchRequestToQuerier_ShouldReEnqueueAfterFailedSend
if tq, ok := queueBroker.tree.(*TreeQueue); ok {
require.False(t, tq.getNode(QueuePath{"tenant-1"}).IsEmpty())
} else if itq, ok := queueBroker.tree.(*MultiQueuingAlgorithmTreeQueue); ok {
require.False(t, itq.GetNode(QueuePath{"tenant-1"}).IsEmpty())
path := queueBroker.makeQueuePathForTests("tenant-1")
require.False(t, itq.GetNode(path).IsEmpty())
}

})
Expand Down
29 changes: 22 additions & 7 deletions pkg/scheduler/queue/tenant_queues.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,34 @@ type queueBroker struct {
func newQueueBroker(
maxTenantQueueSize int,
useMultiAlgoTreeQueue bool,
prioritizeQueryComponents bool,
forgetDelay time.Duration,
) *queueBroker {
tqas := newTenantQuerierAssignments(forgetDelay)
var tree Tree
var err error
if useMultiAlgoTreeQueue {
algos := []QueuingAlgorithm{
tqas, // root; QueuingAlgorithm selects tenants
&roundRobinState{}, // tenant queues; QueuingAlgorithm selects query component
&roundRobinState{}, // query components; QueuingAlgorithm selects query from local queue
var algos []QueuingAlgorithm
if prioritizeQueryComponents {
algos = []QueuingAlgorithm{
&roundRobinState{}, // root; QueuingAlgorithm selects query component
tqas, // query components; QueuingAlgorithm selects tenants
&roundRobinState{}, // tenant queues; QueuingAlgorithm selects from local queue

}
} else {
algos = []QueuingAlgorithm{
tqas, // root; QueuingAlgorithm selects tenants
&roundRobinState{}, // tenant queues; QueuingAlgorithm selects query component
&roundRobinState{}, // query components; QueuingAlgorithm selects query from local queue
}
}
tree, err = NewTree(algos...)
} else {
// by default, use the legacy tree queue
if prioritizeQueryComponents {
panic("cannot prioritize query components for legacy tree queue")
}
tree = NewTreeQueue("root")
}

Expand All @@ -58,9 +72,10 @@ func newQueueBroker(
panic(fmt.Sprintf("error creating the tree queue: %v", err))
}
qb := &queueBroker{
tree: tree,
tenantQuerierAssignments: tqas,
maxTenantQueueSize: maxTenantQueueSize,
tree: tree,
tenantQuerierAssignments: tqas,
maxTenantQueueSize: maxTenantQueueSize,
prioritizeQueryComponents: prioritizeQueryComponents,
}

return qb
Expand Down
Loading
Loading