Skip to content

Commit

Permalink
Addressing comments
Browse files Browse the repository at this point in the history
  • Loading branch information
alanprot committed Jun 4, 2021
1 parent b0039f7 commit 7399a1f
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 15 deletions.
3 changes: 1 addition & 2 deletions pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
}

// Enforce the max chunks limits.
matchers, _ := ingester_client.FromLabelMatchers(req.Matchers)
if chunkLimitErr := queryLimiter.AddChunks(resp.ChunksCount(), matchers); chunkLimitErr != nil {
if chunkLimitErr := queryLimiter.AddChunks(resp.ChunksCount()); chunkLimitErr != nil {
return nil, validation.LimitError(chunkLimitErr.Error())
}

Expand Down
26 changes: 21 additions & 5 deletions pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ const (
)

var (
errNoStoreGatewayAddress = errors.New("no store-gateway address configured")
errNoStoreGatewayAddress = errors.New("no store-gateway address configured")
errMaxChunksPerQueryLimit = "the query hit the max number of chunks limit while fetching chunks from store-gateways for %s (limit: %d)"
)

// BlocksStoreSet is the interface used to get the clients to query series on a set of blocks.
Expand Down Expand Up @@ -402,11 +403,14 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...*
resSeriesSets = []storage.SeriesSet(nil)
resWarnings = storage.Warnings(nil)

maxChunksLimit = q.limits.MaxChunksPerQueryFromStore(q.userID)
leftChunksLimit = maxChunksLimit

resultMtx sync.Mutex
)

queryFunc := func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error) {
seriesSets, queriedBlocks, warnings, _, err := q.fetchSeriesFromStores(spanCtx, sp, clients, minT, maxT, matchers, convertedMatchers)
seriesSets, queriedBlocks, warnings, numChunks, err := q.fetchSeriesFromStores(spanCtx, sp, clients, minT, maxT, matchers, convertedMatchers, maxChunksLimit, leftChunksLimit)
if err != nil {
return nil, err
}
Expand All @@ -416,6 +420,11 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...*
resSeriesSets = append(resSeriesSets, seriesSets...)
resWarnings = append(resWarnings, warnings...)

// Given a single block is guaranteed to not be queried twice, we can safely decrease the number of
// chunks we can still read before hitting the limit (max == 0 means disabled).
if maxChunksLimit > 0 {
leftChunksLimit -= numChunks
}
resultMtx.Unlock()

return queriedBlocks, nil
Expand Down Expand Up @@ -543,6 +552,8 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
maxT int64,
matchers []*labels.Matcher,
convertedMatchers []storepb.LabelMatcher,
maxChunksLimit int,
leftChunksLimit int,
) ([]storage.SeriesSet, []ulid.ULID, storage.Warnings, int, error) {
var (
reqCtx = grpc_metadata.AppendToOutgoingContext(ctx, cortex_tsdb.TenantIDExternalLabel, q.userID)
Expand Down Expand Up @@ -609,17 +620,22 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
}

// Ensure the max number of chunks limit hasn't been reached (max == 0 means disabled).
if chunkLimitErr := queryLimiter.AddChunks(len(s.Chunks), matchers); chunkLimitErr != nil {
return validation.LimitError(chunkLimitErr.Error())
if maxChunksLimit > 0 {
actual := numChunks.Add(int32(len(s.Chunks)))
if actual > int32(leftChunksLimit) {
return validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, util.LabelMatchersToString(matchers), maxChunksLimit))
}
}

chunksSize := 0
for _, c := range s.Chunks {
chunksSize += c.Size()
}
if chunkBytesLimitErr := queryLimiter.AddChunkBytes(chunksSize); chunkBytesLimitErr != nil {
return validation.LimitError(chunkBytesLimitErr.Error())
}
if chunkLimitErr := queryLimiter.AddChunks(len(s.Chunks)); chunkLimitErr != nil {
return validation.LimitError(chunkLimitErr.Error())
}
}

if w := resp.GetWarning(); w != "" {
Expand Down
60 changes: 58 additions & 2 deletions pkg/querier/blocks_store_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,8 +451,26 @@ func TestBlocksStoreQuerier_Select(t *testing.T) {
},
},
limits: &blocksStoreLimitsMock{maxChunksPerQuery: 1},
queryLimiter: noOpQueryLimiter,
expectedErr: validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, fmt.Sprintf("{__name__=%q}", metricName), 1)),
},
"max chunks per query limit hit while fetching chunks at first attempt - global limit": {
finderResult: bucketindex.Blocks{
{ID: block1},
{ID: block2},
},
storeSetResponses: []interface{}{
map[BlocksStoreClient][]ulid.ULID{
&storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{
mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 1),
mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT+1, 2),
mockHintsResponse(block1, block2),
}}: {block1, block2},
},
},
limits: &blocksStoreLimitsMock{},
queryLimiter: limiter.NewQueryLimiter(0, 0, 1),
expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxChunksPerQueryLimit, fmt.Sprintf("{__name__=%q}", metricName), 1)),
expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxChunksPerQueryLimit, 1)),
},
"max chunks per query limit hit while fetching chunks during subsequent attempts": {
finderResult: bucketindex.Blocks{
Expand Down Expand Up @@ -489,8 +507,46 @@ func TestBlocksStoreQuerier_Select(t *testing.T) {
},
},
limits: &blocksStoreLimitsMock{maxChunksPerQuery: 3},
queryLimiter: noOpQueryLimiter,
expectedErr: validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, fmt.Sprintf("{__name__=%q}", metricName), 3)),
},
"max chunks per query limit hit while fetching chunks during subsequent attempts - global": {
finderResult: bucketindex.Blocks{
{ID: block1},
{ID: block2},
{ID: block3},
{ID: block4},
},
storeSetResponses: []interface{}{
// First attempt returns a client whose response does not include all expected blocks.
map[BlocksStoreClient][]ulid.ULID{
&storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{
mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 1),
mockHintsResponse(block1),
}}: {block1, block3},
&storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{
mockSeriesResponse(labels.Labels{metricNameLabel, series2Label}, minT, 2),
mockHintsResponse(block2),
}}: {block2, block4},
},
// Second attempt returns 1 missing block.
map[BlocksStoreClient][]ulid.ULID{
&storeGatewayClientMock{remoteAddr: "3.3.3.3", mockedSeriesResponses: []*storepb.SeriesResponse{
mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT+1, 2),
mockHintsResponse(block3),
}}: {block3, block4},
},
// Third attempt returns the last missing block.
map[BlocksStoreClient][]ulid.ULID{
&storeGatewayClientMock{remoteAddr: "4.4.4.4", mockedSeriesResponses: []*storepb.SeriesResponse{
mockSeriesResponse(labels.Labels{metricNameLabel, series2Label}, minT+1, 3),
mockHintsResponse(block4),
}}: {block4},
},
},
limits: &blocksStoreLimitsMock{},
queryLimiter: limiter.NewQueryLimiter(0, 0, 3),
expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxChunksPerQueryLimit, fmt.Sprintf("{__name__=%q}", metricName), 3)),
expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxChunksPerQueryLimit, 3)),
},
"max series per query limit hit while fetching chunks": {
finderResult: bucketindex.Blocks{
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter,
return nil, err
}

ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(limits.MaxFetchedSeriesPerQuery(userID), limits.MaxFetchedChunkBytesPerQuery(userID), limits.MaxChunksPerQueryFromStore(userID)))
ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(limits.MaxFetchedSeriesPerQuery(userID), limits.MaxFetchedChunkBytesPerQuery(userID), limits.MaxChunksPerQuery(userID)))

mint, maxt, err = validateQueryTimeRange(ctx, userID, mint, maxt, limits, cfg.MaxQueryIntoFuture)
if err == errEmptyTimeRange {
Expand Down
8 changes: 3 additions & 5 deletions pkg/util/limiter/query_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@ import (
"sync"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"go.uber.org/atomic"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/util"
)

type queryLimiterCtxKey struct{}
Expand All @@ -20,7 +18,7 @@ var (
ctxKey = &queryLimiterCtxKey{}
ErrMaxSeriesHit = "the query hit the max number of series limit (limit: %d series)"
ErrMaxChunkBytesHit = "the query hit the aggregated chunks size limit (limit: %d bytes)"
ErrMaxChunksPerQueryLimit = "the query hit the max number of chunks limit while fetching chunks from ingesters for %s (limit: %d)"
ErrMaxChunksPerQueryLimit = "the query hit the max number of chunks limit (limit: %d chunks)"
)

type QueryLimiter struct {
Expand Down Expand Up @@ -100,13 +98,13 @@ func (ql *QueryLimiter) AddChunkBytes(chunkSizeInBytes int) error {
return nil
}

func (ql *QueryLimiter) AddChunks(count int, matchers []*labels.Matcher) error {
func (ql *QueryLimiter) AddChunks(count int) error {
if ql.maxChunksPerQuery == 0 {
return nil
}

if ql.chunkCount.Add(int64(count)) > int64(ql.maxChunksPerQuery) {
return fmt.Errorf(fmt.Sprintf(ErrMaxChunksPerQueryLimit, util.LabelMatchersToString(matchers), ql.maxChunksPerQuery))
return fmt.Errorf(fmt.Sprintf(ErrMaxChunksPerQueryLimit, ql.maxChunksPerQuery))
}
return nil
}
4 changes: 4 additions & 0 deletions pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,10 @@ func (o *Overrides) MaxChunksPerQueryFromStore(userID string) int {
return o.getOverridesForUser(userID).MaxChunksPerQueryFromStore
}

func (o *Overrides) MaxChunksPerQuery(userID string) int {
return o.getOverridesForUser(userID).MaxChunksPerQuery
}

// MaxFetchedSeriesPerQuery returns the maximum number of series allowed per query when fetching
// chunks from ingesters and blocks storage.
func (o *Overrides) MaxFetchedSeriesPerQuery(userID string) int {
Expand Down

0 comments on commit 7399a1f

Please sign in to comment.