From 7399a1f656d4b78a2d9bd10c02afe6151816773a Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Fri, 4 Jun 2021 13:29:37 -0700 Subject: [PATCH] Addressing comments --- pkg/distributor/query.go | 3 +- pkg/querier/blocks_store_queryable.go | 26 ++++++++-- pkg/querier/blocks_store_queryable_test.go | 60 +++++++++++++++++++++- pkg/querier/querier.go | 2 +- pkg/util/limiter/query_limiter.go | 8 ++- pkg/util/validation/limits.go | 4 ++ 6 files changed, 88 insertions(+), 15 deletions(-) diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index f3c2ccfbb8..1d7b1da49e 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -210,8 +210,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri } // Enforce the max chunks limits. - matchers, _ := ingester_client.FromLabelMatchers(req.Matchers) - if chunkLimitErr := queryLimiter.AddChunks(resp.ChunksCount(), matchers); chunkLimitErr != nil { + if chunkLimitErr := queryLimiter.AddChunks(resp.ChunksCount()); chunkLimitErr != nil { return nil, validation.LimitError(chunkLimitErr.Error()) } diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 2d1fd41db5..98cb61e140 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -54,7 +54,8 @@ const ( ) var ( - errNoStoreGatewayAddress = errors.New("no store-gateway address configured") + errNoStoreGatewayAddress = errors.New("no store-gateway address configured") + errMaxChunksPerQueryLimit = "the query hit the max number of chunks limit while fetching chunks from store-gateways for %s (limit: %d)" ) // BlocksStoreSet is the interface used to get the clients to query series on a set of blocks. @@ -402,11 +403,14 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...* resSeriesSets = []storage.SeriesSet(nil) resWarnings = storage.Warnings(nil) + maxChunksLimit = q.limits.MaxChunksPerQueryFromStore(q.userID) + leftChunksLimit = maxChunksLimit + resultMtx sync.Mutex ) queryFunc := func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error) { - seriesSets, queriedBlocks, warnings, _, err := q.fetchSeriesFromStores(spanCtx, sp, clients, minT, maxT, matchers, convertedMatchers) + seriesSets, queriedBlocks, warnings, numChunks, err := q.fetchSeriesFromStores(spanCtx, sp, clients, minT, maxT, matchers, convertedMatchers, maxChunksLimit, leftChunksLimit) if err != nil { return nil, err } @@ -416,6 +420,11 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...* resSeriesSets = append(resSeriesSets, seriesSets...) resWarnings = append(resWarnings, warnings...) + // Given a single block is guaranteed to not be queried twice, we can safely decrease the number of + // chunks we can still read before hitting the limit (max == 0 means disabled). + if maxChunksLimit > 0 { + leftChunksLimit -= numChunks + } resultMtx.Unlock() return queriedBlocks, nil @@ -543,6 +552,8 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( maxT int64, matchers []*labels.Matcher, convertedMatchers []storepb.LabelMatcher, + maxChunksLimit int, + leftChunksLimit int, ) ([]storage.SeriesSet, []ulid.ULID, storage.Warnings, int, error) { var ( reqCtx = grpc_metadata.AppendToOutgoingContext(ctx, cortex_tsdb.TenantIDExternalLabel, q.userID) @@ -609,10 +620,12 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( } // Ensure the max number of chunks limit hasn't been reached (max == 0 means disabled). - if chunkLimitErr := queryLimiter.AddChunks(len(s.Chunks), matchers); chunkLimitErr != nil { - return validation.LimitError(chunkLimitErr.Error()) + if maxChunksLimit > 0 { + actual := numChunks.Add(int32(len(s.Chunks))) + if actual > int32(leftChunksLimit) { + return validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, util.LabelMatchersToString(matchers), maxChunksLimit)) + } } - chunksSize := 0 for _, c := range s.Chunks { chunksSize += c.Size() @@ -620,6 +633,9 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( if chunkBytesLimitErr := queryLimiter.AddChunkBytes(chunksSize); chunkBytesLimitErr != nil { return validation.LimitError(chunkBytesLimitErr.Error()) } + if chunkLimitErr := queryLimiter.AddChunks(len(s.Chunks)); chunkLimitErr != nil { + return validation.LimitError(chunkLimitErr.Error()) + } } if w := resp.GetWarning(); w != "" { diff --git a/pkg/querier/blocks_store_queryable_test.go b/pkg/querier/blocks_store_queryable_test.go index 8b348f7e94..40e868be4a 100644 --- a/pkg/querier/blocks_store_queryable_test.go +++ b/pkg/querier/blocks_store_queryable_test.go @@ -451,8 +451,26 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, }, limits: &blocksStoreLimitsMock{maxChunksPerQuery: 1}, + queryLimiter: noOpQueryLimiter, + expectedErr: validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, fmt.Sprintf("{__name__=%q}", metricName), 1)), + }, + "max chunks per query limit hit while fetching chunks at first attempt - global limit": { + finderResult: bucketindex.Blocks{ + {ID: block1}, + {ID: block2}, + }, + storeSetResponses: []interface{}{ + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 1), + mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT+1, 2), + mockHintsResponse(block1, block2), + }}: {block1, block2}, + }, + }, + limits: &blocksStoreLimitsMock{}, queryLimiter: limiter.NewQueryLimiter(0, 0, 1), - expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxChunksPerQueryLimit, fmt.Sprintf("{__name__=%q}", metricName), 1)), + expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxChunksPerQueryLimit, 1)), }, "max chunks per query limit hit while fetching chunks during subsequent attempts": { finderResult: bucketindex.Blocks{ @@ -489,8 +507,46 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, }, limits: &blocksStoreLimitsMock{maxChunksPerQuery: 3}, + queryLimiter: noOpQueryLimiter, + expectedErr: validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, fmt.Sprintf("{__name__=%q}", metricName), 3)), + }, + "max chunks per query limit hit while fetching chunks during subsequent attempts - global": { + finderResult: bucketindex.Blocks{ + {ID: block1}, + {ID: block2}, + {ID: block3}, + {ID: block4}, + }, + storeSetResponses: []interface{}{ + // First attempt returns a client whose response does not include all expected blocks. + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 1), + mockHintsResponse(block1), + }}: {block1, block3}, + &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.Labels{metricNameLabel, series2Label}, minT, 2), + mockHintsResponse(block2), + }}: {block2, block4}, + }, + // Second attempt returns 1 missing block. + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{remoteAddr: "3.3.3.3", mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT+1, 2), + mockHintsResponse(block3), + }}: {block3, block4}, + }, + // Third attempt returns the last missing block. + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{remoteAddr: "4.4.4.4", mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.Labels{metricNameLabel, series2Label}, minT+1, 3), + mockHintsResponse(block4), + }}: {block4}, + }, + }, + limits: &blocksStoreLimitsMock{}, queryLimiter: limiter.NewQueryLimiter(0, 0, 3), - expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxChunksPerQueryLimit, fmt.Sprintf("{__name__=%q}", metricName), 3)), + expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxChunksPerQueryLimit, 3)), }, "max series per query limit hit while fetching chunks": { finderResult: bucketindex.Blocks{ diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index b351e48ef5..ab7e63cfa9 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -224,7 +224,7 @@ func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter, return nil, err } - ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(limits.MaxFetchedSeriesPerQuery(userID), limits.MaxFetchedChunkBytesPerQuery(userID), limits.MaxChunksPerQueryFromStore(userID))) + ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(limits.MaxFetchedSeriesPerQuery(userID), limits.MaxFetchedChunkBytesPerQuery(userID), limits.MaxChunksPerQuery(userID))) mint, maxt, err = validateQueryTimeRange(ctx, userID, mint, maxt, limits, cfg.MaxQueryIntoFuture) if err == errEmptyTimeRange { diff --git a/pkg/util/limiter/query_limiter.go b/pkg/util/limiter/query_limiter.go index 1fdd951d6d..b5c33c62ef 100644 --- a/pkg/util/limiter/query_limiter.go +++ b/pkg/util/limiter/query_limiter.go @@ -6,12 +6,10 @@ import ( "sync" "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/pkg/labels" "go.uber.org/atomic" "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/ingester/client" - "github.com/cortexproject/cortex/pkg/util" ) type queryLimiterCtxKey struct{} @@ -20,7 +18,7 @@ var ( ctxKey = &queryLimiterCtxKey{} ErrMaxSeriesHit = "the query hit the max number of series limit (limit: %d series)" ErrMaxChunkBytesHit = "the query hit the aggregated chunks size limit (limit: %d bytes)" - ErrMaxChunksPerQueryLimit = "the query hit the max number of chunks limit while fetching chunks from ingesters for %s (limit: %d)" + ErrMaxChunksPerQueryLimit = "the query hit the max number of chunks limit (limit: %d chunks)" ) type QueryLimiter struct { @@ -100,13 +98,13 @@ func (ql *QueryLimiter) AddChunkBytes(chunkSizeInBytes int) error { return nil } -func (ql *QueryLimiter) AddChunks(count int, matchers []*labels.Matcher) error { +func (ql *QueryLimiter) AddChunks(count int) error { if ql.maxChunksPerQuery == 0 { return nil } if ql.chunkCount.Add(int64(count)) > int64(ql.maxChunksPerQuery) { - return fmt.Errorf(fmt.Sprintf(ErrMaxChunksPerQueryLimit, util.LabelMatchersToString(matchers), ql.maxChunksPerQuery)) + return fmt.Errorf(fmt.Sprintf(ErrMaxChunksPerQueryLimit, ql.maxChunksPerQuery)) } return nil } diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 3085583bee..77228bdb76 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -388,6 +388,10 @@ func (o *Overrides) MaxChunksPerQueryFromStore(userID string) int { return o.getOverridesForUser(userID).MaxChunksPerQueryFromStore } +func (o *Overrides) MaxChunksPerQuery(userID string) int { + return o.getOverridesForUser(userID).MaxChunksPerQuery +} + // MaxFetchedSeriesPerQuery returns the maximum number of series allowed per query when fetching // chunks from ingesters and blocks storage. func (o *Overrides) MaxFetchedSeriesPerQuery(userID string) int {