Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

max chunks per query limit shared between ingesters and storage gateways #4260

Merged
merged 5 commits into from
Jun 29, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
- `-alertmanager.receivers-firewall.block.private-addresses` renamed to `-alertmanager.receivers-firewall-block-private-addresses`
* [CHANGE] Change default value of `-server.grpc.keepalive.min-time-between-pings` to `10s` and `-server.grpc.keepalive.ping-without-stream-allowed` to `true`. #4168
* [CHANGE] Ingester: Change default value of `-ingester.active-series-metrics-enabled` to `true`. This incurs a small increase in memory usage, between 1.2% and 1.6% as measured on ingesters with 1.3M active series. #4257
* [CHANGE] Querier / ruler: Change `-querier.max-fetched-chunks-per-query` configuration to limit to maximum number of chunks that can be fetched in a single query. The number of chunks fetched by ingesters AND long-term storare combined should not exceed the value configured on `-querier.max-fetched-chunks-per-query`
alanprot marked this conversation as resolved.
Show resolved Hide resolved
* [FEATURE] Querier: Added new `-querier.max-fetched-series-per-query` flag. When Cortex is running with blocks storage, the max series per query limit is enforced in the querier and applies to unique series received from ingesters and store-gateway (long-term storage). #4179
* [FEATURE] Querier/Ruler: Added new `-querier.max-fetched-chunk-bytes-per-query` flag. When Cortex is running with blocks storage, the max chunk bytes limit is enforced in the querier and ruler and limits the size of all aggregated chunks returned from ingesters and storage as bytes for a query. #4216
* [FEATURE] Alertmanager: Added rate-limits to notifiers. Rate limits used by all integrations can be configured using `-alertmanager.notification-rate-limit`, while per-integration rate limits can be specified via `-alertmanager.notification-rate-limit-per-integration` parameter. Both shared and per-integration limits can be overwritten using overrides mechanism. These limits are applied on individual (per-tenant) alertmanagers. Rate-limited notifications are failed notifications. It is possible to monitor rate-limited notifications via new `cortex_alertmanager_notification_rate_limited_total` metric. #4135 #4163
Expand Down Expand Up @@ -59,11 +60,13 @@
* [BUGFIX] Fixed cache fetch error on Redis Cluster. #4056
* [BUGFIX] Ingester: fix issue where runtime limits erroneously override default limits. #4246
* [BUGFIX] Ruler: fix startup in single-binary mode when the new `ruler_storage` is used. #4252
<<<<<<< HEAD
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bit of merge conflict debris to clean up

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for that :D

* [BUGFIX] Querier: fix queries failing with "at least 1 healthy replica required, could only find 0" error right after scaling up store-gateways until they're ACTIVE in the ring. #4263
* [BUGFIX] Store-gateway: when blocks sharding is enabled, do not load all blocks in each store-gateway in case of a cold startup, but load only blocks owned by the store-gateway replica. #4271
* [BUGFIX] Memberlist: fix to setting the default configuration value for `-memberlist.retransmit-factor` when not provided. This should improve propagation delay of the ring state (including, but not limited to, tombstones). Note that if the configuration is already explicitly given, this fix has no effect. #4269
* [BUGFIX] Querier: Fix issue where samples in a chunk might get skipped by batch iterator. #4218

=======
>>>>>>> 723e6e847 (Addressing comments - 2)
alanprot marked this conversation as resolved.
Show resolved Hide resolved
## Blocksconvert

* [ENHANCEMENT] Scanner: add support for DynamoDB (v9 schema only). #3828
Expand Down
8 changes: 3 additions & 5 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4043,11 +4043,9 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
[max_chunks_per_query: <int> | default = 2000000]

# Maximum number of chunks that can be fetched in a single query from ingesters
# and long-term storage: the total number of actual fetched chunks could be 2x
# the limit, being independently applied when querying ingesters and long-term
# storage. This limit is enforced in the ingester (if chunks streaming is
# enabled), querier, ruler and store-gateway. Takes precedence over the
# deprecated -store.query-chunk-limit. 0 to disable.
# and long-term storage. This limit is enforced in the querier, ruler and
# store-gateway. Takes precedence over the deprecated -store.query-chunk-limit.
# 0 to disable.
# CLI flag: -querier.max-fetched-chunks-per-query
[max_fetched_chunks_per_query: <int> | default = 0]

Expand Down
6 changes: 4 additions & 2 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,8 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunksPerQueryLimitIsReac
shardByAllLabels: true,
limits: limits,
})

ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(0, 0, maxChunksLimit))
defer stopAll(ds, r)

// Push a number of series below the max chunks limit. Each series has 1 sample,
Expand Down Expand Up @@ -957,7 +959,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxSeriesPerQueryLimitIsReac
ctx := user.InjectOrgID(context.Background(), "user")
limits := &validation.Limits{}
flagext.DefaultValues(limits)
ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(maxSeriesLimit, 0))
ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(maxSeriesLimit, 0, 0))

// Prepare distributors.
ds, _, r, _ := prepare(t, prepConfig{
Expand Down Expand Up @@ -1043,7 +1045,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunkBytesPerQueryLimitIs
var maxBytesLimit = (seriesToAdd) * responseChunkSize

// Update the limiter with the calculated limits.
ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(0, maxBytesLimit))
ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(0, maxBytesLimit, 0))

// Push a number of series below the max chunk bytes limit. Subtract one for the series added above.
writeReq = makeWriteRequest(0, seriesToAdd-1, 0)
Expand Down
27 changes: 4 additions & 23 deletions pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package distributor

import (
"context"
"fmt"
"io"
"sort"
"time"
Expand All @@ -11,7 +10,6 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/weaveworks/common/instrument"
"go.uber.org/atomic"

"github.com/cortexproject/cortex/pkg/cortexpb"
ingester_client "github.com/cortexproject/cortex/pkg/ingester/client"
Expand All @@ -24,10 +22,6 @@ import (
"github.com/cortexproject/cortex/pkg/util/validation"
)

var (
errMaxChunksPerQueryLimit = "the query hit the max number of chunks limit while fetching chunks from ingesters for %s (limit: %d)"
)

// Query multiple ingesters and returns a Matrix of samples.
func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error) {
var matrix model.Matrix
Expand Down Expand Up @@ -86,11 +80,6 @@ func (d *Distributor) QueryExemplars(ctx context.Context, from, to model.Time, m
func (d *Distributor) QueryStream(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (*ingester_client.QueryStreamResponse, error) {
var result *ingester_client.QueryStreamResponse
err := instrument.CollectedRequest(ctx, "Distributor.QueryStream", d.queryDuration, instrument.ErrorCode, func(ctx context.Context) error {
userID, err := tenant.TenantID(ctx)
if err != nil {
return err
}

req, err := ingester_client.ToQueryRequest(from, to, matchers)
if err != nil {
return err
Expand All @@ -101,7 +90,7 @@ func (d *Distributor) QueryStream(ctx context.Context, from, to model.Time, matc
return err
}

result, err = d.queryIngesterStream(ctx, userID, replicationSet, req)
result, err = d.queryIngesterStream(ctx, replicationSet, req)
if err != nil {
return err
}
Expand Down Expand Up @@ -290,10 +279,8 @@ func (d *Distributor) queryIngestersExemplars(ctx context.Context, replicationSe
}

// queryIngesterStream queries the ingesters using the new streaming API.
func (d *Distributor) queryIngesterStream(ctx context.Context, userID string, replicationSet ring.ReplicationSet, req *ingester_client.QueryRequest) (*ingester_client.QueryStreamResponse, error) {
func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ring.ReplicationSet, req *ingester_client.QueryRequest) (*ingester_client.QueryStreamResponse, error) {
var (
chunksLimit = d.limits.MaxChunksPerQueryFromIngesters(userID)
chunksCount = atomic.Int32{}
queryLimiter = limiter.QueryLimiterFromContextWithFallback(ctx)
)

Expand Down Expand Up @@ -327,14 +314,8 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, userID string, re
}

// Enforce the max chunks limits.
if chunksLimit > 0 {
if count := int(chunksCount.Add(int32(resp.ChunksCount()))); count > chunksLimit {
// We expect to be always able to convert the label matchers back to Prometheus ones.
// In case we fail (unexpected) the error will not include the matchers, but the core
// logic doesn't break.
matchers, _ := ingester_client.FromLabelMatchers(req.Matchers)
return nil, validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, util.LabelMatchersToString(matchers), chunksLimit))
}
if chunkLimitErr := queryLimiter.AddChunks(resp.ChunksCount()); chunkLimitErr != nil {
return nil, validation.LimitError(chunkLimitErr.Error())
}

for _, series := range resp.Chunkseries {
Expand Down
3 changes: 3 additions & 0 deletions pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,9 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
if chunkBytesLimitErr := queryLimiter.AddChunkBytes(chunksSize); chunkBytesLimitErr != nil {
return validation.LimitError(chunkBytesLimitErr.Error())
}
if chunkLimitErr := queryLimiter.AddChunks(len(s.Chunks)); chunkLimitErr != nil {
return validation.LimitError(chunkLimitErr.Error())
}
}

if w := resp.GetWarning(); w != "" {
Expand Down
62 changes: 59 additions & 3 deletions pkg/querier/blocks_store_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) {
metricNameLabel = labels.Label{Name: labels.MetricName, Value: metricName}
series1Label = labels.Label{Name: "series", Value: "1"}
series2Label = labels.Label{Name: "series", Value: "2"}
noOpQueryLimiter = limiter.NewQueryLimiter(0, 0)
noOpQueryLimiter = limiter.NewQueryLimiter(0, 0, 0)
)

type valueResult struct {
Expand Down Expand Up @@ -454,6 +454,24 @@ func TestBlocksStoreQuerier_Select(t *testing.T) {
queryLimiter: noOpQueryLimiter,
expectedErr: validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, fmt.Sprintf("{__name__=%q}", metricName), 1)),
},
"max chunks per query limit hit while fetching chunks at first attempt - global limit": {
finderResult: bucketindex.Blocks{
{ID: block1},
{ID: block2},
},
storeSetResponses: []interface{}{
map[BlocksStoreClient][]ulid.ULID{
&storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{
mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 1),
mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT+1, 2),
mockHintsResponse(block1, block2),
}}: {block1, block2},
},
},
limits: &blocksStoreLimitsMock{},
queryLimiter: limiter.NewQueryLimiter(0, 0, 1),
expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxChunksPerQueryLimit, 1)),
},
"max chunks per query limit hit while fetching chunks during subsequent attempts": {
finderResult: bucketindex.Blocks{
{ID: block1},
Expand Down Expand Up @@ -492,6 +510,44 @@ func TestBlocksStoreQuerier_Select(t *testing.T) {
queryLimiter: noOpQueryLimiter,
expectedErr: validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, fmt.Sprintf("{__name__=%q}", metricName), 3)),
},
"max chunks per query limit hit while fetching chunks during subsequent attempts - global": {
finderResult: bucketindex.Blocks{
{ID: block1},
{ID: block2},
{ID: block3},
{ID: block4},
},
storeSetResponses: []interface{}{
// First attempt returns a client whose response does not include all expected blocks.
map[BlocksStoreClient][]ulid.ULID{
&storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{
mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 1),
mockHintsResponse(block1),
}}: {block1, block3},
&storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{
mockSeriesResponse(labels.Labels{metricNameLabel, series2Label}, minT, 2),
mockHintsResponse(block2),
}}: {block2, block4},
},
// Second attempt returns 1 missing block.
map[BlocksStoreClient][]ulid.ULID{
&storeGatewayClientMock{remoteAddr: "3.3.3.3", mockedSeriesResponses: []*storepb.SeriesResponse{
mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT+1, 2),
mockHintsResponse(block3),
}}: {block3, block4},
},
// Third attempt returns the last missing block.
map[BlocksStoreClient][]ulid.ULID{
&storeGatewayClientMock{remoteAddr: "4.4.4.4", mockedSeriesResponses: []*storepb.SeriesResponse{
mockSeriesResponse(labels.Labels{metricNameLabel, series2Label}, minT+1, 3),
mockHintsResponse(block4),
}}: {block4},
},
},
limits: &blocksStoreLimitsMock{},
queryLimiter: limiter.NewQueryLimiter(0, 0, 3),
expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxChunksPerQueryLimit, 3)),
},
"max series per query limit hit while fetching chunks": {
finderResult: bucketindex.Blocks{
{ID: block1},
Expand All @@ -507,7 +563,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) {
},
},
limits: &blocksStoreLimitsMock{},
queryLimiter: limiter.NewQueryLimiter(1, 0),
queryLimiter: limiter.NewQueryLimiter(1, 0, 0),
expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxSeriesHit, 1)),
},
"max chunk bytes per query limit hit while fetching chunks": {
Expand All @@ -525,7 +581,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) {
},
},
limits: &blocksStoreLimitsMock{maxChunksPerQuery: 1},
queryLimiter: limiter.NewQueryLimiter(0, 8),
queryLimiter: limiter.NewQueryLimiter(0, 8, 0),
expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxChunkBytesHit, 8)),
},
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter,
return nil, err
}

ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(limits.MaxFetchedSeriesPerQuery(userID), limits.MaxFetchedChunkBytesPerQuery(userID)))
ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(limits.MaxFetchedSeriesPerQuery(userID), limits.MaxFetchedChunkBytesPerQuery(userID), limits.MaxChunksPerQuery(userID)))

mint, maxt, err = validateQueryTimeRange(ctx, userID, mint, maxt, limits, cfg.MaxQueryIntoFuture)
if err == errEmptyTimeRange {
Expand Down
25 changes: 20 additions & 5 deletions pkg/util/limiter/query_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,34 @@ import (
type queryLimiterCtxKey struct{}

var (
ctxKey = &queryLimiterCtxKey{}
ErrMaxSeriesHit = "the query hit the max number of series limit (limit: %d series)"
ErrMaxChunkBytesHit = "the query hit the aggregated chunks size limit (limit: %d bytes)"
ctxKey = &queryLimiterCtxKey{}
ErrMaxSeriesHit = "the query hit the max number of series limit (limit: %d series)"
ErrMaxChunkBytesHit = "the query hit the aggregated chunks size limit (limit: %d bytes)"
ErrMaxChunksPerQueryLimit = "the query hit the max number of chunks limit (limit: %d chunks)"
)

type QueryLimiter struct {
uniqueSeriesMx sync.Mutex
uniqueSeries map[model.Fingerprint]struct{}

chunkBytesCount atomic.Int64
chunkCount atomic.Int64

maxSeriesPerQuery int
maxChunkBytesPerQuery int
maxChunksPerQuery int
}

// NewQueryLimiter makes a new per-query limiter. Each query limiter
// is configured using the `maxSeriesPerQuery` limit.
func NewQueryLimiter(maxSeriesPerQuery, maxChunkBytesPerQuery int) *QueryLimiter {
func NewQueryLimiter(maxSeriesPerQuery, maxChunkBytesPerQuery int, maxChunksPerQuery int) *QueryLimiter {
return &QueryLimiter{
uniqueSeriesMx: sync.Mutex{},
uniqueSeries: map[model.Fingerprint]struct{}{},

maxSeriesPerQuery: maxSeriesPerQuery,
maxChunkBytesPerQuery: maxChunkBytesPerQuery,
maxChunksPerQuery: maxChunksPerQuery,
}
}

Expand All @@ -52,7 +56,7 @@ func QueryLimiterFromContextWithFallback(ctx context.Context) *QueryLimiter {
ql, ok := ctx.Value(ctxKey).(*QueryLimiter)
if !ok {
// If there's no limiter return a new unlimited limiter as a fallback
ql = NewQueryLimiter(0, 0)
ql = NewQueryLimiter(0, 0, 0)
}
return ql
}
Expand Down Expand Up @@ -93,3 +97,14 @@ func (ql *QueryLimiter) AddChunkBytes(chunkSizeInBytes int) error {
}
return nil
}

func (ql *QueryLimiter) AddChunks(count int) error {
if ql.maxChunksPerQuery == 0 {
return nil
}

if ql.chunkCount.Add(int64(count)) > int64(ql.maxChunksPerQuery) {
return fmt.Errorf(fmt.Sprintf(ErrMaxChunksPerQueryLimit, ql.maxChunksPerQuery))
}
return nil
}
8 changes: 4 additions & 4 deletions pkg/util/limiter/query_limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestQueryLimiter_AddSeries_ShouldReturnNoErrorOnLimitNotExceeded(t *testing
labels.MetricName: metricName + "_2",
"series2": "1",
})
limiter = NewQueryLimiter(100, 0)
limiter = NewQueryLimiter(100, 0, 0)
)
err := limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(series1))
assert.NoError(t, err)
Expand Down Expand Up @@ -53,7 +53,7 @@ func TestQueryLimiter_AddSeriers_ShouldReturnErrorOnLimitExceeded(t *testing.T)
labels.MetricName: metricName + "_2",
"series2": "1",
})
limiter = NewQueryLimiter(1, 0)
limiter = NewQueryLimiter(1, 0, 0)
)
err := limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(series1))
require.NoError(t, err)
Expand All @@ -62,7 +62,7 @@ func TestQueryLimiter_AddSeriers_ShouldReturnErrorOnLimitExceeded(t *testing.T)
}

func TestQueryLimiter_AddChunkBytes(t *testing.T) {
var limiter = NewQueryLimiter(0, 100)
var limiter = NewQueryLimiter(0, 100, 0)

err := limiter.AddChunkBytes(100)
require.NoError(t, err)
Expand All @@ -84,7 +84,7 @@ func BenchmarkQueryLimiter_AddSeries(b *testing.B) {
}
b.ResetTimer()

limiter := NewQueryLimiter(b.N+1, 0)
limiter := NewQueryLimiter(b.N+1, 0, 0)
for _, s := range series {
err := limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(s))
assert.NoError(b, err)
Expand Down
Loading