Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

max chunks per query limit shared between ingesters and storage gateways #4260

Merged
merged 5 commits into from
Jun 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## master / unreleased

* [CHANGE] Querier / ruler: Change `-querier.max-fetched-chunks-per-query` configuration to limit to maximum number of chunks that can be fetched in a single query. The number of chunks fetched by ingesters AND long-term storare combined should not exceed the value configured on `-querier.max-fetched-chunks-per-query`. #4260

## 1.10.0-rc.0 / 2021-06-28

* [CHANGE] Enable strict JSON unmarshal for `pkg/util/validation.Limits` struct. The custom `UnmarshalJSON()` will now fail if the input has unknown fields. #4298
Expand Down Expand Up @@ -70,7 +72,6 @@
* [BUGFIX] Store-gateway: when blocks sharding is enabled, do not load all blocks in each store-gateway in case of a cold startup, but load only blocks owned by the store-gateway replica. #4271
* [BUGFIX] Memberlist: fix to setting the default configuration value for `-memberlist.retransmit-factor` when not provided. This should improve propagation delay of the ring state (including, but not limited to, tombstones). Note that if the configuration is already explicitly given, this fix has no effect. #4269
* [BUGFIX] Querier: Fix issue where samples in a chunk might get skipped by batch iterator. #4218

## Blocksconvert

* [ENHANCEMENT] Scanner: add support for DynamoDB (v9 schema only). #3828
Expand Down
8 changes: 3 additions & 5 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4043,11 +4043,9 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
[max_chunks_per_query: <int> | default = 2000000]

# Maximum number of chunks that can be fetched in a single query from ingesters
# and long-term storage: the total number of actual fetched chunks could be 2x
# the limit, being independently applied when querying ingesters and long-term
# storage. This limit is enforced in the ingester (if chunks streaming is
# enabled), querier, ruler and store-gateway. Takes precedence over the
# deprecated -store.query-chunk-limit. 0 to disable.
# and long-term storage. This limit is enforced in the querier, ruler and
# store-gateway. Takes precedence over the deprecated -store.query-chunk-limit.
# 0 to disable.
# CLI flag: -querier.max-fetched-chunks-per-query
[max_fetched_chunks_per_query: <int> | default = 0]

Expand Down
6 changes: 4 additions & 2 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,8 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunksPerQueryLimitIsReac
shardByAllLabels: true,
limits: limits,
})

ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(0, 0, maxChunksLimit))
defer stopAll(ds, r)

// Push a number of series below the max chunks limit. Each series has 1 sample,
Expand Down Expand Up @@ -957,7 +959,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxSeriesPerQueryLimitIsReac
ctx := user.InjectOrgID(context.Background(), "user")
limits := &validation.Limits{}
flagext.DefaultValues(limits)
ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(maxSeriesLimit, 0))
ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(maxSeriesLimit, 0, 0))

// Prepare distributors.
ds, _, r, _ := prepare(t, prepConfig{
Expand Down Expand Up @@ -1043,7 +1045,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunkBytesPerQueryLimitIs
var maxBytesLimit = (seriesToAdd) * responseChunkSize

// Update the limiter with the calculated limits.
ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(0, maxBytesLimit))
ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(0, maxBytesLimit, 0))

// Push a number of series below the max chunk bytes limit. Subtract one for the series added above.
writeReq = makeWriteRequest(0, seriesToAdd-1, 0)
Expand Down
27 changes: 4 additions & 23 deletions pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package distributor

import (
"context"
"fmt"
"io"
"sort"
"time"
Expand All @@ -11,7 +10,6 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/weaveworks/common/instrument"
"go.uber.org/atomic"

"github.com/cortexproject/cortex/pkg/cortexpb"
ingester_client "github.com/cortexproject/cortex/pkg/ingester/client"
Expand All @@ -24,10 +22,6 @@ import (
"github.com/cortexproject/cortex/pkg/util/validation"
)

var (
errMaxChunksPerQueryLimit = "the query hit the max number of chunks limit while fetching chunks from ingesters for %s (limit: %d)"
)

// Query multiple ingesters and returns a Matrix of samples.
func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error) {
var matrix model.Matrix
Expand Down Expand Up @@ -86,11 +80,6 @@ func (d *Distributor) QueryExemplars(ctx context.Context, from, to model.Time, m
func (d *Distributor) QueryStream(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (*ingester_client.QueryStreamResponse, error) {
var result *ingester_client.QueryStreamResponse
err := instrument.CollectedRequest(ctx, "Distributor.QueryStream", d.queryDuration, instrument.ErrorCode, func(ctx context.Context) error {
userID, err := tenant.TenantID(ctx)
if err != nil {
return err
}

req, err := ingester_client.ToQueryRequest(from, to, matchers)
if err != nil {
return err
Expand All @@ -101,7 +90,7 @@ func (d *Distributor) QueryStream(ctx context.Context, from, to model.Time, matc
return err
}

result, err = d.queryIngesterStream(ctx, userID, replicationSet, req)
result, err = d.queryIngesterStream(ctx, replicationSet, req)
if err != nil {
return err
}
Expand Down Expand Up @@ -290,10 +279,8 @@ func (d *Distributor) queryIngestersExemplars(ctx context.Context, replicationSe
}

// queryIngesterStream queries the ingesters using the new streaming API.
func (d *Distributor) queryIngesterStream(ctx context.Context, userID string, replicationSet ring.ReplicationSet, req *ingester_client.QueryRequest) (*ingester_client.QueryStreamResponse, error) {
func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ring.ReplicationSet, req *ingester_client.QueryRequest) (*ingester_client.QueryStreamResponse, error) {
var (
chunksLimit = d.limits.MaxChunksPerQueryFromIngesters(userID)
chunksCount = atomic.Int32{}
queryLimiter = limiter.QueryLimiterFromContextWithFallback(ctx)
)

Expand Down Expand Up @@ -327,14 +314,8 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, userID string, re
}

// Enforce the max chunks limits.
if chunksLimit > 0 {
if count := int(chunksCount.Add(int32(resp.ChunksCount()))); count > chunksLimit {
// We expect to be always able to convert the label matchers back to Prometheus ones.
// In case we fail (unexpected) the error will not include the matchers, but the core
// logic doesn't break.
matchers, _ := ingester_client.FromLabelMatchers(req.Matchers)
return nil, validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, util.LabelMatchersToString(matchers), chunksLimit))
}
if chunkLimitErr := queryLimiter.AddChunks(resp.ChunksCount()); chunkLimitErr != nil {
return nil, validation.LimitError(chunkLimitErr.Error())
}

for _, series := range resp.Chunkseries {
Expand Down
3 changes: 3 additions & 0 deletions pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,9 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
if chunkBytesLimitErr := queryLimiter.AddChunkBytes(chunksSize); chunkBytesLimitErr != nil {
return validation.LimitError(chunkBytesLimitErr.Error())
}
if chunkLimitErr := queryLimiter.AddChunks(len(s.Chunks)); chunkLimitErr != nil {
return validation.LimitError(chunkLimitErr.Error())
}
}

if w := resp.GetWarning(); w != "" {
Expand Down
62 changes: 59 additions & 3 deletions pkg/querier/blocks_store_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) {
metricNameLabel = labels.Label{Name: labels.MetricName, Value: metricName}
series1Label = labels.Label{Name: "series", Value: "1"}
series2Label = labels.Label{Name: "series", Value: "2"}
noOpQueryLimiter = limiter.NewQueryLimiter(0, 0)
noOpQueryLimiter = limiter.NewQueryLimiter(0, 0, 0)
)

type valueResult struct {
Expand Down Expand Up @@ -454,6 +454,24 @@ func TestBlocksStoreQuerier_Select(t *testing.T) {
queryLimiter: noOpQueryLimiter,
expectedErr: validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, fmt.Sprintf("{__name__=%q}", metricName), 1)),
},
"max chunks per query limit hit while fetching chunks at first attempt - global limit": {
finderResult: bucketindex.Blocks{
{ID: block1},
{ID: block2},
},
storeSetResponses: []interface{}{
map[BlocksStoreClient][]ulid.ULID{
&storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{
mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 1),
mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT+1, 2),
mockHintsResponse(block1, block2),
}}: {block1, block2},
},
},
limits: &blocksStoreLimitsMock{},
queryLimiter: limiter.NewQueryLimiter(0, 0, 1),
expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxChunksPerQueryLimit, 1)),
},
"max chunks per query limit hit while fetching chunks during subsequent attempts": {
finderResult: bucketindex.Blocks{
{ID: block1},
Expand Down Expand Up @@ -492,6 +510,44 @@ func TestBlocksStoreQuerier_Select(t *testing.T) {
queryLimiter: noOpQueryLimiter,
expectedErr: validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, fmt.Sprintf("{__name__=%q}", metricName), 3)),
},
"max chunks per query limit hit while fetching chunks during subsequent attempts - global": {
finderResult: bucketindex.Blocks{
{ID: block1},
{ID: block2},
{ID: block3},
{ID: block4},
},
storeSetResponses: []interface{}{
// First attempt returns a client whose response does not include all expected blocks.
map[BlocksStoreClient][]ulid.ULID{
&storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{
mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 1),
mockHintsResponse(block1),
}}: {block1, block3},
&storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{
mockSeriesResponse(labels.Labels{metricNameLabel, series2Label}, minT, 2),
mockHintsResponse(block2),
}}: {block2, block4},
},
// Second attempt returns 1 missing block.
map[BlocksStoreClient][]ulid.ULID{
&storeGatewayClientMock{remoteAddr: "3.3.3.3", mockedSeriesResponses: []*storepb.SeriesResponse{
mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT+1, 2),
mockHintsResponse(block3),
}}: {block3, block4},
},
// Third attempt returns the last missing block.
map[BlocksStoreClient][]ulid.ULID{
&storeGatewayClientMock{remoteAddr: "4.4.4.4", mockedSeriesResponses: []*storepb.SeriesResponse{
mockSeriesResponse(labels.Labels{metricNameLabel, series2Label}, minT+1, 3),
mockHintsResponse(block4),
}}: {block4},
},
},
limits: &blocksStoreLimitsMock{},
queryLimiter: limiter.NewQueryLimiter(0, 0, 3),
expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxChunksPerQueryLimit, 3)),
},
"max series per query limit hit while fetching chunks": {
finderResult: bucketindex.Blocks{
{ID: block1},
Expand All @@ -507,7 +563,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) {
},
},
limits: &blocksStoreLimitsMock{},
queryLimiter: limiter.NewQueryLimiter(1, 0),
queryLimiter: limiter.NewQueryLimiter(1, 0, 0),
expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxSeriesHit, 1)),
},
"max chunk bytes per query limit hit while fetching chunks": {
Expand All @@ -525,7 +581,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) {
},
},
limits: &blocksStoreLimitsMock{maxChunksPerQuery: 1},
queryLimiter: limiter.NewQueryLimiter(0, 8),
queryLimiter: limiter.NewQueryLimiter(0, 8, 0),
expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxChunkBytesHit, 8)),
},
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter,
return nil, err
}

ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(limits.MaxFetchedSeriesPerQuery(userID), limits.MaxFetchedChunkBytesPerQuery(userID)))
ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(limits.MaxFetchedSeriesPerQuery(userID), limits.MaxFetchedChunkBytesPerQuery(userID), limits.MaxChunksPerQuery(userID)))

mint, maxt, err = validateQueryTimeRange(ctx, userID, mint, maxt, limits, cfg.MaxQueryIntoFuture)
if err == errEmptyTimeRange {
Expand Down
25 changes: 20 additions & 5 deletions pkg/util/limiter/query_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,34 @@ import (
type queryLimiterCtxKey struct{}

var (
ctxKey = &queryLimiterCtxKey{}
ErrMaxSeriesHit = "the query hit the max number of series limit (limit: %d series)"
ErrMaxChunkBytesHit = "the query hit the aggregated chunks size limit (limit: %d bytes)"
ctxKey = &queryLimiterCtxKey{}
ErrMaxSeriesHit = "the query hit the max number of series limit (limit: %d series)"
ErrMaxChunkBytesHit = "the query hit the aggregated chunks size limit (limit: %d bytes)"
ErrMaxChunksPerQueryLimit = "the query hit the max number of chunks limit (limit: %d chunks)"
)

type QueryLimiter struct {
uniqueSeriesMx sync.Mutex
uniqueSeries map[model.Fingerprint]struct{}

chunkBytesCount atomic.Int64
chunkCount atomic.Int64

maxSeriesPerQuery int
maxChunkBytesPerQuery int
maxChunksPerQuery int
}

// NewQueryLimiter makes a new per-query limiter. Each query limiter
// is configured using the `maxSeriesPerQuery` limit.
func NewQueryLimiter(maxSeriesPerQuery, maxChunkBytesPerQuery int) *QueryLimiter {
func NewQueryLimiter(maxSeriesPerQuery, maxChunkBytesPerQuery int, maxChunksPerQuery int) *QueryLimiter {
return &QueryLimiter{
uniqueSeriesMx: sync.Mutex{},
uniqueSeries: map[model.Fingerprint]struct{}{},

maxSeriesPerQuery: maxSeriesPerQuery,
maxChunkBytesPerQuery: maxChunkBytesPerQuery,
maxChunksPerQuery: maxChunksPerQuery,
}
}

Expand All @@ -52,7 +56,7 @@ func QueryLimiterFromContextWithFallback(ctx context.Context) *QueryLimiter {
ql, ok := ctx.Value(ctxKey).(*QueryLimiter)
if !ok {
// If there's no limiter return a new unlimited limiter as a fallback
ql = NewQueryLimiter(0, 0)
ql = NewQueryLimiter(0, 0, 0)
}
return ql
}
Expand Down Expand Up @@ -93,3 +97,14 @@ func (ql *QueryLimiter) AddChunkBytes(chunkSizeInBytes int) error {
}
return nil
}

func (ql *QueryLimiter) AddChunks(count int) error {
if ql.maxChunksPerQuery == 0 {
return nil
}

if ql.chunkCount.Add(int64(count)) > int64(ql.maxChunksPerQuery) {
return fmt.Errorf(fmt.Sprintf(ErrMaxChunksPerQueryLimit, ql.maxChunksPerQuery))
}
return nil
}
8 changes: 4 additions & 4 deletions pkg/util/limiter/query_limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestQueryLimiter_AddSeries_ShouldReturnNoErrorOnLimitNotExceeded(t *testing
labels.MetricName: metricName + "_2",
"series2": "1",
})
limiter = NewQueryLimiter(100, 0)
limiter = NewQueryLimiter(100, 0, 0)
)
err := limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(series1))
assert.NoError(t, err)
Expand Down Expand Up @@ -53,7 +53,7 @@ func TestQueryLimiter_AddSeriers_ShouldReturnErrorOnLimitExceeded(t *testing.T)
labels.MetricName: metricName + "_2",
"series2": "1",
})
limiter = NewQueryLimiter(1, 0)
limiter = NewQueryLimiter(1, 0, 0)
)
err := limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(series1))
require.NoError(t, err)
Expand All @@ -62,7 +62,7 @@ func TestQueryLimiter_AddSeriers_ShouldReturnErrorOnLimitExceeded(t *testing.T)
}

func TestQueryLimiter_AddChunkBytes(t *testing.T) {
var limiter = NewQueryLimiter(0, 100)
var limiter = NewQueryLimiter(0, 100, 0)

err := limiter.AddChunkBytes(100)
require.NoError(t, err)
Expand All @@ -84,7 +84,7 @@ func BenchmarkQueryLimiter_AddSeries(b *testing.B) {
}
b.ResetTimer()

limiter := NewQueryLimiter(b.N+1, 0)
limiter := NewQueryLimiter(b.N+1, 0, 0)
for _, s := range series {
err := limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(s))
assert.NoError(b, err)
Expand Down
6 changes: 2 additions & 4 deletions pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&l.MaxGlobalMetricsWithMetadataPerUser, "ingester.max-global-metadata-per-user", 0, "The maximum number of active metrics with metadata per user, across the cluster. 0 to disable. Supported only if -distributor.shard-by-all-labels is true.")
f.IntVar(&l.MaxGlobalMetadataPerMetric, "ingester.max-global-metadata-per-metric", 0, "The maximum number of metadata per metric, across the cluster. 0 to disable.")
f.IntVar(&l.MaxChunksPerQueryFromStore, "store.query-chunk-limit", 2e6, "Deprecated. Use -querier.max-fetched-chunks-per-query CLI flag and its respective YAML config option instead. Maximum number of chunks that can be fetched in a single query. This limit is enforced when fetching chunks from the long-term storage only. When running the Cortex chunks storage, this limit is enforced in the querier and ruler, while when running the Cortex blocks storage this limit is enforced in the querier, ruler and store-gateway. 0 to disable.")
f.IntVar(&l.MaxChunksPerQuery, "querier.max-fetched-chunks-per-query", 0, "Maximum number of chunks that can be fetched in a single query from ingesters and long-term storage: the total number of actual fetched chunks could be 2x the limit, being independently applied when querying ingesters and long-term storage. This limit is enforced in the ingester (if chunks streaming is enabled), querier, ruler and store-gateway. Takes precedence over the deprecated -store.query-chunk-limit. 0 to disable.")
f.IntVar(&l.MaxChunksPerQuery, "querier.max-fetched-chunks-per-query", 0, "Maximum number of chunks that can be fetched in a single query from ingesters and long-term storage. This limit is enforced in the querier, ruler and store-gateway. Takes precedence over the deprecated -store.query-chunk-limit. 0 to disable.")
f.IntVar(&l.MaxFetchedSeriesPerQuery, "querier.max-fetched-series-per-query", 0, "The maximum number of unique series for which a query can fetch samples from each ingesters and blocks storage. This limit is enforced in the querier only when running Cortex with blocks storage. 0 to disable")
f.IntVar(&l.MaxFetchedChunkBytesPerQuery, "querier.max-fetched-chunk-bytes-per-query", 0, "The maximum size of all chunks in bytes that a query can fetch from each ingester and storage. This limit is enforced in the querier and ruler only when running Cortex with blocks storage. 0 to disable.")
f.Var(&l.MaxQueryLength, "store.max-query-length", "Limit the query time range (end - start time). This limit is enforced in the query-frontend (on the received query), in the querier (on the query possibly split by the query-frontend) and in the chunks storage. 0 to disable.")
Expand Down Expand Up @@ -398,9 +398,7 @@ func (o *Overrides) MaxChunksPerQueryFromStore(userID string) int {
return o.getOverridesForUser(userID).MaxChunksPerQueryFromStore
}

// MaxChunksPerQueryFromIngesters returns the maximum number of chunks allowed per query when fetching
// chunks from ingesters.
func (o *Overrides) MaxChunksPerQueryFromIngesters(userID string) int {
func (o *Overrides) MaxChunksPerQuery(userID string) int {
return o.getOverridesForUser(userID).MaxChunksPerQuery
}

Expand Down