Skip to content

Commit

Permalink
Create flag to prioritize dequeuing from query components before tena…
Browse files Browse the repository at this point in the history
…nt fairness
  • Loading branch information
chencs committed Aug 15, 2024
1 parent 9b342ef commit 8f15ec1
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 33 deletions.
1 change: 1 addition & 0 deletions pkg/frontend/v1/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Regist
log,
cfg.MaxOutstandingPerTenant,
false,
false,
cfg.QuerierForgetDelay,
f.queueLength,
f.discardedRequests,
Expand Down
3 changes: 2 additions & 1 deletion pkg/scheduler/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ func NewRequestQueue(
log log.Logger,
maxOutstandingPerTenant int,
useMultiAlgoQueue bool,
prioritizeQueryComponents bool,
forgetDelay time.Duration,
queueLength *prometheus.GaugeVec,
discardedRequests *prometheus.CounterVec,
Expand Down Expand Up @@ -273,7 +274,7 @@ func NewRequestQueue(
waitingQuerierConnsToDispatch: list.New(),

QueryComponentUtilization: queryComponentCapacity,
queueBroker: newQueueBroker(maxOutstandingPerTenant, useMultiAlgoQueue, forgetDelay),
queueBroker: newQueueBroker(maxOutstandingPerTenant, useMultiAlgoQueue, prioritizeQueryComponents, forgetDelay),
}

q.Service = services.NewBasicService(q.starting, q.running, q.stop).WithName("request queue")
Expand Down
32 changes: 21 additions & 11 deletions pkg/scheduler/queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,20 @@ import (
util_test "github.com/grafana/mimir/pkg/util/test"
)

// // TODO (casie): Write tests for prioritizeQueryComponents is true
// buildTreeTestsStruct returns all _allowed_ combinations of config flags for testing.
func buildTreeTestsStruct() []struct {
name string
useMultiAlgoTreeQueue bool
name string
useMultiAlgoTreeQueue bool
prioritizeQueryComponents bool
} {
return []struct {
name string
useMultiAlgoTreeQueue bool
name string
useMultiAlgoTreeQueue bool
prioritizeQueryComponents bool
}{
{"legacy tree queue", false},
{"integrated tree queue", true},
{"legacy tree queue with prioritize query components disabled", false, false},
{"integrated tree queue with prioritize query components disabled", true, false},
{"integrated tree queue with prioritize query components enabled", true, true},
}
}

Expand Down Expand Up @@ -107,9 +110,8 @@ func makeSchedulerRequest(tenantID string, additionalQueueDimensions []string) *
// round-robins between the multiple queues, which has the effect of alternately dequeuing from the slow queries
// and normal queries rather than blocking normal queries behind slow queries.
func TestMultiDimensionalQueueFairnessSlowConsumerEffects(t *testing.T) {
treeTypes := buildTreeTestsStruct()

for _, tt := range treeTypes {
for _, tt := range buildTreeTestsStruct() {
// Only test allowed combinations of these configs
t.Run(tt.name, func(t *testing.T) {
promRegistry := prometheus.NewPedanticRegistry()

Expand Down Expand Up @@ -153,6 +155,7 @@ func TestMultiDimensionalQueueFairnessSlowConsumerEffects(t *testing.T) {
log.NewNopLogger(),
maxOutstandingRequestsPerTenant,
tt.useMultiAlgoTreeQueue,
tt.prioritizeQueryComponents,
forgetQuerierDelay,
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
Expand Down Expand Up @@ -264,6 +267,7 @@ func BenchmarkConcurrentQueueOperations(b *testing.B) {
log.NewNopLogger(),
maxOutstandingRequestsPerTenant,
t.useMultiAlgoTreeQueue,
t.prioritizeQueryComponents,
forgetQuerierDelay,
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
Expand Down Expand Up @@ -450,6 +454,7 @@ func TestRequestQueue_RegisterAndUnregisterQuerierWorkerConnections(t *testing.T
log.NewNopLogger(),
1,
tt.useMultiAlgoTreeQueue,
false,
forgetDelay,
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
Expand Down Expand Up @@ -543,6 +548,7 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldGetRequestAfterReshardingBe
log.NewNopLogger(),
1,
tt.useMultiAlgoTreeQueue,
tt.prioritizeQueryComponents,
forgetDelay,
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
Expand Down Expand Up @@ -622,6 +628,7 @@ func TestRequestQueue_GetNextRequestForQuerier_ReshardNotifiedCorrectlyForMultip
log.NewNopLogger(),
1,
tt.useMultiAlgoTreeQueue,
tt.prioritizeQueryComponents,
forgetDelay,
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
Expand Down Expand Up @@ -717,6 +724,7 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldReturnAfterContextCancelled
log.NewNopLogger(),
1,
tt.useMultiAlgoTreeQueue,
tt.prioritizeQueryComponents,
forgetDelay,
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
Expand Down Expand Up @@ -777,6 +785,7 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldReturnImmediatelyIfQuerierI
log.NewNopLogger(),
1,
tt.useMultiAlgoTreeQueue,
tt.prioritizeQueryComponents,
forgetDelay,
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
Expand Down Expand Up @@ -814,6 +823,7 @@ func TestRequestQueue_tryDispatchRequestToQuerier_ShouldReEnqueueAfterFailedSend
log.NewNopLogger(),
1,
tt.useMultiAlgoTreeQueue,
tt.prioritizeQueryComponents,
forgetDelay,
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
Expand All @@ -824,7 +834,7 @@ func TestRequestQueue_tryDispatchRequestToQuerier_ShouldReEnqueueAfterFailedSend

// bypassing queue dispatcher loop for direct usage of the queueBroker and
// passing a waitingQuerierConn for a canceled querier connection
queueBroker := newQueueBroker(queue.maxOutstandingPerTenant, false, queue.forgetDelay)
queueBroker := newQueueBroker(queue.maxOutstandingPerTenant, false, false, queue.forgetDelay)
queueBroker.addQuerierWorkerConn(NewUnregisteredQuerierWorkerConn(context.Background(), querierID))

tenantMaxQueriers := 0 // no sharding
Expand Down
29 changes: 22 additions & 7 deletions pkg/scheduler/queue/tenant_queues.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,34 @@ type queueBroker struct {
func newQueueBroker(
maxTenantQueueSize int,
useMultiAlgoTreeQueue bool,
prioritizeQueryComponents bool,
forgetDelay time.Duration,
) *queueBroker {
tqas := newTenantQuerierAssignments(forgetDelay)
var tree Tree
var err error
if useMultiAlgoTreeQueue {
algos := []QueuingAlgorithm{
tqas, // root; QueuingAlgorithm selects tenants
&roundRobinState{}, // tenant queues; QueuingAlgorithm selects query component
&roundRobinState{}, // query components; QueuingAlgorithm selects query from local queue
var algos []QueuingAlgorithm
if prioritizeQueryComponents {
algos = []QueuingAlgorithm{
&roundRobinState{}, // root; QueuingAlgorithm selects query component
tqas, // query components; QueuingAlgorithm selects tenants
&roundRobinState{}, // tenant queues; QueuingAlgorithm selects from local queue

}
} else {
algos = []QueuingAlgorithm{
tqas, // root; QueuingAlgorithm selects tenants
&roundRobinState{}, // tenant queues; QueuingAlgorithm selects query component
&roundRobinState{}, // query components; QueuingAlgorithm selects query from local queue
}
}
tree, err = NewTree(algos...)
} else {
// by default, use the legacy tree queue
if prioritizeQueryComponents {
panic("cannot prioritize query components for legacy tree queue")
}
tree = NewTreeQueue("root")
}

Expand All @@ -58,9 +72,10 @@ func newQueueBroker(
panic(fmt.Sprintf("error creating the tree queue: %v", err))
}
qb := &queueBroker{
tree: tree,
tenantQuerierAssignments: tqas,
maxTenantQueueSize: maxTenantQueueSize,
tree: tree,
tenantQuerierAssignments: tqas,
maxTenantQueueSize: maxTenantQueueSize,
prioritizeQueryComponents: prioritizeQueryComponents,
}

return qb
Expand Down
50 changes: 36 additions & 14 deletions pkg/scheduler/queue/tenant_queues_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestQueues_NoShuffleSharding(t *testing.T) {
treeTypes := buildTreeTestsStruct()
for _, tt := range treeTypes {
t.Run(tt.name, func(t *testing.T) {
qb := newQueueBroker(0, tt.useMultiAlgoTreeQueue, 0)
qb := newQueueBroker(0, tt.useMultiAlgoTreeQueue, tt.prioritizeQueryComponents, 0)
assert.NotNil(t, qb)
assert.NoError(t, isConsistent(qb))

Expand Down Expand Up @@ -217,14 +217,12 @@ func TestQueues_NoShuffleSharding(t *testing.T) {
}

func TestQueuesRespectMaxTenantQueueSizeWithSubQueues(t *testing.T) {
// TODO (casie): When implementing prioritizeQueryComponents, add tests here, since we will be able to have
// multiple nodes for a single tenant
treeTypes := buildTreeTestsStruct()

for _, tt := range treeTypes {
t.Run(tt.name, func(t *testing.T) {
maxTenantQueueSize := 100
qb := newQueueBroker(maxTenantQueueSize, tt.useMultiAlgoTreeQueue, 0)
qb := newQueueBroker(maxTenantQueueSize, tt.useMultiAlgoTreeQueue, tt.prioritizeQueryComponents, 0)
additionalQueueDimensions := map[int][]string{
0: nil,
1: {"ingester"},
Expand Down Expand Up @@ -254,17 +252,38 @@ func TestQueuesRespectMaxTenantQueueSizeWithSubQueues(t *testing.T) {
if tq, ok := qb.tree.(*TreeQueue); ok {
assert.Equal(t, maxTenantQueueSize, tq.getNode(queuePath).ItemCount())
} else if itq, ok := qb.tree.(*MultiQueuingAlgorithmTreeQueue); ok {
assert.Equal(t, maxTenantQueueSize, itq.GetNode(queuePath).ItemCount())
var itemCount int
// if prioritizeQueryComponents, we need to build paths for each queue dimension
// and sum all items
if qb.prioritizeQueryComponents {
for _, addlQueueDim := range additionalQueueDimensions {
var path QueuePath
path = append(append(path, addlQueueDim...), "tenant-1")
if addlQueueDim == nil {
path = qb.makeQueuePathForTests("tenant-1")
}
itemCount += itq.GetNode(path).ItemCount()
}
assert.Equal(t, maxTenantQueueSize, itemCount)

} else {
assert.Equal(t, maxTenantQueueSize, itq.GetNode(queuePath).ItemCount())
}
}

// assert equal distribution of queue items between tenant node and 3 subnodes
// assert equal distribution of queue items between 4 subnodes
for _, v := range additionalQueueDimensions {
var checkPath QueuePath
if v == nil {
checkPath = append(QueuePath{"tenant-1"}, unknownQueueDimension)
} else {
v = []string{unknownQueueDimension}
}
switch qb.prioritizeQueryComponents {
case true:
checkPath = append(append(checkPath, v...), "tenant-1")
default:
checkPath = append(QueuePath{"tenant-1"}, v...)
}

// TODO (casie): After deprecating legacy tree queue, clean this up
var itemCount int
if tq, ok := qb.tree.(*TreeQueue); ok {
Expand Down Expand Up @@ -308,7 +327,7 @@ func TestQueuesOnTerminatingQuerier(t *testing.T) {
treeTypes := buildTreeTestsStruct()
for _, tt := range treeTypes {
t.Run(tt.name, func(t *testing.T) {
qb := newQueueBroker(0, tt.useMultiAlgoTreeQueue, 0)
qb := newQueueBroker(0, tt.useMultiAlgoTreeQueue, tt.prioritizeQueryComponents, 0)
assert.NotNil(t, qb)
assert.NoError(t, isConsistent(qb))

Expand Down Expand Up @@ -382,7 +401,7 @@ func TestQueues_QuerierDistribution(t *testing.T) {
treeTypes := buildTreeTestsStruct()
for _, tt := range treeTypes {
t.Run(tt.name, func(t *testing.T) {
qb := newQueueBroker(0, tt.useMultiAlgoTreeQueue, 0)
qb := newQueueBroker(0, tt.useMultiAlgoTreeQueue, tt.prioritizeQueryComponents, 0)
assert.NotNil(t, qb)
assert.NoError(t, isConsistent(qb))

Expand Down Expand Up @@ -462,7 +481,7 @@ func TestQueuesConsistency(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
qb := newQueueBroker(0, tt.useMultiAlgoTreeQueue, testData.forgetDelay)
qb := newQueueBroker(0, tt.useMultiAlgoTreeQueue, tt.prioritizeQueryComponents, testData.forgetDelay)
assert.NotNil(t, qb)
assert.NoError(t, isConsistent(qb))

Expand Down Expand Up @@ -523,7 +542,7 @@ func TestQueues_ForgetDelay(t *testing.T) {
for _, tt := range treeTypes {
t.Run(tt.name, func(t *testing.T) {
now := time.Now()
qb := newQueueBroker(0, tt.useMultiAlgoTreeQueue, forgetDelay)
qb := newQueueBroker(0, tt.useMultiAlgoTreeQueue, tt.prioritizeQueryComponents, forgetDelay)
assert.NotNil(t, qb)
assert.NoError(t, isConsistent(qb))

Expand Down Expand Up @@ -636,7 +655,7 @@ func TestQueues_ForgetDelay_ShouldCorrectlyHandleQuerierReconnectingBeforeForget
for _, tt := range treeTypes {
t.Run(tt.name, func(t *testing.T) {
now := time.Now()
qb := newQueueBroker(0, tt.useMultiAlgoTreeQueue, forgetDelay)
qb := newQueueBroker(0, tt.useMultiAlgoTreeQueue, tt.prioritizeQueryComponents, forgetDelay)
assert.NotNil(t, qb)
assert.NoError(t, isConsistent(qb))

Expand Down Expand Up @@ -764,7 +783,7 @@ func (qb *queueBroker) getOrAddTenantQueue(tenantID TenantID, maxQueriers int) e
// removeTenantQueue is a test utility, not intended for use by consumers of queueBroker
func (qb *queueBroker) removeTenantQueue(tenantID TenantID) bool {
qb.tenantQuerierAssignments.removeTenant(tenantID)
queuePath := QueuePath{string(tenantID)}
queuePath := qb.makeQueuePathForTests(tenantID)

// TODO (casie): When deprecating legacy tree queue, clean this up
if tq, ok := qb.tree.(*TreeQueue); ok {
Expand Down Expand Up @@ -811,6 +830,9 @@ func (n *Node) deleteNode(pathFromNode QueuePath) bool {
}

func (qb *queueBroker) makeQueuePathForTests(tenantID TenantID) QueuePath {
if qb.prioritizeQueryComponents {
return QueuePath{unknownQueueDimension, string(tenantID)}
}
return QueuePath{string(tenantID)}
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package scheduler
import (
"context"
"flag"
"fmt"
"io"
"net/http"
"strings"
Expand Down Expand Up @@ -96,6 +97,7 @@ type Config struct {
MaxOutstandingPerTenant int `yaml:"max_outstanding_requests_per_tenant"`
AdditionalQueryQueueDimensionsEnabled bool `yaml:"additional_query_queue_dimensions_enabled" category:"experimental"`
UseMultiAlgorithmQueryQueue bool `yaml:"use_multi_algorithm_query_queue" category:"experimental"`
PrioritizeQueryComponents bool `yaml:"prioritize_query_components" category:"experimental"`
QuerierForgetDelay time.Duration `yaml:"querier_forget_delay" category:"experimental"`

GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=This configures the gRPC client used to report errors back to the query-frontend."`
Expand All @@ -106,13 +108,18 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
f.IntVar(&cfg.MaxOutstandingPerTenant, "query-scheduler.max-outstanding-requests-per-tenant", 100, "Maximum number of outstanding requests per tenant per query-scheduler. In-flight requests above this limit will fail with HTTP response status code 429.")
f.BoolVar(&cfg.AdditionalQueryQueueDimensionsEnabled, "query-scheduler.additional-query-queue-dimensions-enabled", false, "Non-operational: Enqueue query requests with additional queue dimensions to split tenant request queues into subqueues. This enables separate requests to proceed from a tenant's subqueues even when other subqueues are blocked on slow query requests. Must be set on both query-frontend and scheduler to take effect. (default false)")
f.BoolVar(&cfg.UseMultiAlgorithmQueryQueue, "query-scheduler.use-multi-algorithm-query-queue", false, "Use an experimental version of the query queue which has the same behavior as the existing queue, but integrates tenant selection into the tree model.")
f.BoolVar(&cfg.PrioritizeQueryComponents, "query-scheduler.prioritize-query-components", false, "When enabled, the query scheduler will primarily prioritize dequeuing fairly from queue components, and secondarily prioritize dequeuing fairly across tenants. When disabled, tenant fairness is primarily prioritized. `query-scheduler.use-multi-algorithm-query-queue` must be enabled in order to use this flag.")
f.DurationVar(&cfg.QuerierForgetDelay, "query-scheduler.querier-forget-delay", 0, "If a querier disconnects without sending notification about graceful shutdown, the query-scheduler will keep the querier in the tenant's shard until the forget delay has passed. This feature is useful to reduce the blast radius when shuffle-sharding is enabled.")

cfg.GRPCClientConfig.RegisterFlagsWithPrefix("query-scheduler.grpc-client-config", f)
cfg.ServiceDiscovery.RegisterFlags(f, logger)
}

func (cfg *Config) Validate() error {
if cfg.PrioritizeQueryComponents && !cfg.UseMultiAlgorithmQueryQueue {
return fmt.Errorf("cannot enable query-scheduler.prioritize-query-components without query-scheduler.use-multi-algorithm-query-queue")
}

return cfg.ServiceDiscovery.Validate()
}

Expand Down Expand Up @@ -162,6 +169,7 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe
s.log,
cfg.MaxOutstandingPerTenant,
cfg.UseMultiAlgorithmQueryQueue,
cfg.PrioritizeQueryComponents,
cfg.QuerierForgetDelay,
s.queueLength,
s.discardedRequests,
Expand Down

0 comments on commit 8f15ec1

Please sign in to comment.