Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enforce series and sample limits on streaming queries to ingester from querier #3873

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ import (
var (
errFail = fmt.Errorf("Fail")
emptyResponse = &cortexpb.WriteResponse{}
ctx = user.InjectOrgID(context.Background(), "user")
testUserID = "user"
ctx = user.InjectOrgID(context.Background(), testUserID)
)

func TestConfig_Validate(t *testing.T) {
Expand Down Expand Up @@ -710,12 +711,12 @@ func TestDistributor_PushQuery(t *testing.T) {
assert.Equal(t, &cortexpb.WriteResponse{}, writeResponse)
assert.Nil(t, err)

response, err := ds[0].Query(ctx, 0, 10, tc.matchers...)
response, err := ds[0].Query(ctx, testUserID, 0, 10, tc.matchers...)
sort.Sort(response)
assert.Equal(t, tc.expectedResponse, response)
assert.Equal(t, tc.expectedError, err)

series, err := ds[0].QueryStream(ctx, 0, 10, tc.matchers...)
series, err := ds[0].QueryStream(ctx, testUserID, 0, 10, tc.matchers...)
assert.Equal(t, tc.expectedError, err)

if series == nil {
Expand Down Expand Up @@ -1006,10 +1007,10 @@ func TestSlowQueries(t *testing.T) {
})
defer stopAll(ds, r)

_, err := ds[0].Query(ctx, 0, 10, nameMatcher)
_, err := ds[0].Query(ctx, testUserID, 0, 10, nameMatcher)
assert.Equal(t, expectedErr, err)

_, err = ds[0].QueryStream(ctx, 0, 10, nameMatcher)
_, err = ds[0].QueryStream(ctx, testUserID, 0, 10, nameMatcher)
assert.Equal(t, expectedErr, err)
})
}
Expand Down
37 changes: 25 additions & 12 deletions pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package distributor

import (
"context"
"fmt"
"io"
"time"

Expand All @@ -19,15 +20,15 @@ import (
)

// Query multiple ingesters and returns a Matrix of samples.
func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error) {
func (d *Distributor) Query(ctx context.Context, userID string, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we enforce the limits here as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes; my focus was on things that blow up, which a range query is much more likely to.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Query() is used for range queries too, no? It's used when the "gRPC streaming" is disabled, while "QueryStream()" is called when it's enabled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, sorry, I got confused.

For non-streaming queries and chunks, ingester enforces the limits already.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For non-streaming queries and chunks, ingester enforces the limits already.

Where is it done? I can't find it.

var matrix model.Matrix
err := instrument.CollectedRequest(ctx, "Distributor.Query", d.queryDuration, instrument.ErrorCode, func(ctx context.Context) error {
req, err := ingester_client.ToQueryRequest(from, to, matchers)
if err != nil {
return err
}

replicationSet, err := d.GetIngestersForQuery(ctx, matchers...)
replicationSet, err := d.GetIngestersForQuery(ctx, userID, matchers...)
if err != nil {
return err
}
Expand All @@ -46,20 +47,20 @@ func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers .
}

// QueryStream multiple ingesters via the streaming interface and returns big ol' set of chunks.
func (d *Distributor) QueryStream(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (*ingester_client.QueryStreamResponse, error) {
func (d *Distributor) QueryStream(ctx context.Context, userID string, from, to model.Time, matchers ...*labels.Matcher) (*ingester_client.QueryStreamResponse, error) {
var result *ingester_client.QueryStreamResponse
err := instrument.CollectedRequest(ctx, "Distributor.QueryStream", d.queryDuration, instrument.ErrorCode, func(ctx context.Context) error {
req, err := ingester_client.ToQueryRequest(from, to, matchers)
if err != nil {
return err
}

replicationSet, err := d.GetIngestersForQuery(ctx, matchers...)
replicationSet, err := d.GetIngestersForQuery(ctx, userID, matchers...)
if err != nil {
return err
}

result, err = d.queryIngesterStream(ctx, replicationSet, req)
result, err = d.queryIngesterStream(ctx, userID, replicationSet, req)
if err != nil {
return err
}
Expand All @@ -74,12 +75,7 @@ func (d *Distributor) QueryStream(ctx context.Context, from, to model.Time, matc

// GetIngestersForQuery returns a replication set including all ingesters that should be queried
// to fetch series matching input label matchers.
func (d *Distributor) GetIngestersForQuery(ctx context.Context, matchers ...*labels.Matcher) (ring.ReplicationSet, error) {
userID, err := tenant.TenantID(ctx)
if err != nil {
return ring.ReplicationSet{}, err
}

func (d *Distributor) GetIngestersForQuery(ctx context.Context, userID string, matchers ...*labels.Matcher) (ring.ReplicationSet, error) {
// If shuffle sharding is enabled we should only query ingesters which are
// part of the tenant's subring.
if d.cfg.ShardingStrategy == util.ShardingStrategyShuffle {
Expand Down Expand Up @@ -172,7 +168,10 @@ func (d *Distributor) queryIngesters(ctx context.Context, replicationSet ring.Re
}

// queryIngesterStream queries the ingesters using the new streaming API.
func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ring.ReplicationSet, req *ingester_client.QueryRequest) (*ingester_client.QueryStreamResponse, error) {
func (d *Distributor) queryIngesterStream(ctx context.Context, userID string, replicationSet ring.ReplicationSet, req *ingester_client.QueryRequest) (*ingester_client.QueryStreamResponse, error) {
maxSeries := d.limits.MaxSeriesPerQuery(userID)
maxSamples := d.limits.MaxSamplesPerQuery(userID)

// Fetch samples from multiple ingesters
results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) {
client, err := d.ingesterPool.GetClientFor(ing.Addr)
Expand Down Expand Up @@ -204,6 +203,10 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri

result.Chunkseries = append(result.Chunkseries, resp.Chunkseries...)
result.Timeseries = append(result.Timeseries, resp.Timeseries...)

if len(result.Chunkseries) > maxSeries || len(result.Timeseries) > maxSeries {
return nil, fmt.Errorf("exceeded maximum number of series in a query (limit %d)", maxSeries)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the implications on the returned status code? I've the feeling this may be detected as a storage error and we return a 5xx error (while it should be a 4xx) but I haven't deeply checked it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it would work better as an httpgrpc error?

Copy link
Contributor

@pracucci pracucci Mar 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bboreham I don't remember how the error code propagation works in detail but I would start testing it. Do you have time/interest to work on it, otherwise I can takeover, cause I'm interested into this limit too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked it and we have to return a validation.LimitError.

}
}
return result, nil
})
Expand All @@ -214,6 +217,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
hashToChunkseries := map[string]ingester_client.TimeSeriesChunk{}
hashToTimeSeries := map[string]ingester_client.TimeSeries{}

sampleCount := 0
for _, result := range results {
response := result.(*ingester_client.QueryStreamResponse)

Expand All @@ -231,15 +235,24 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
key := ingester_client.LabelsToKeyString(ingester_client.FromLabelAdaptersToLabels(series.Labels))
existing := hashToTimeSeries[key]
existing.Labels = series.Labels
previousCount := len(existing.Samples)
if existing.Samples == nil {
existing.Samples = series.Samples
} else {
existing.Samples = mergeSamples(existing.Samples, series.Samples)
}
hashToTimeSeries[key] = existing
sampleCount += len(existing.Samples) - previousCount
if sampleCount > maxSamples {
return nil, fmt.Errorf("exceeded maximum number of samples in a query (limit %d)", maxSamples)
}
}
}

if len(hashToChunkseries) > maxSeries || len(hashToTimeSeries) > maxSeries {
return nil, fmt.Errorf("exceeded maximum number of series in a query (limit %d)", maxSeries)
}

resp := &ingester_client.QueryStreamResponse{
Chunkseries: make([]ingester_client.TimeSeriesChunk, 0, len(hashToChunkseries)),
Timeseries: make([]ingester_client.TimeSeries, 0, len(hashToTimeSeries)),
Expand Down
22 changes: 11 additions & 11 deletions pkg/querier/distributor_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
// Distributor is the read interface to the distributor, made an interface here
// to reduce package coupling.
type Distributor interface {
Query(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error)
QueryStream(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (*client.QueryStreamResponse, error)
Query(ctx context.Context, userID string, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error)
QueryStream(ctx context.Context, userID string, from, to model.Time, matchers ...*labels.Matcher) (*client.QueryStreamResponse, error)
LabelValuesForLabelName(ctx context.Context, from, to model.Time, label model.LabelName, matchers ...*labels.Matcher) ([]string, error)
LabelNames(context.Context, model.Time, model.Time) ([]string, error)
MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]metric.Metric, error)
Expand Down Expand Up @@ -116,11 +116,16 @@ func (q *distributorQuerier) Select(_ bool, sp *storage.SelectHints, matchers ..
}
}

userID, err := tenant.TenantID(q.ctx)
if err != nil {
return storage.ErrSeriesSet(err)
}

if q.streaming {
return q.streamingSelect(ctx, minT, maxT, matchers)
return q.streamingSelect(ctx, userID, minT, maxT, matchers)
}

matrix, err := q.distributor.Query(ctx, model.Time(minT), model.Time(maxT), matchers...)
matrix, err := q.distributor.Query(ctx, userID, model.Time(minT), model.Time(maxT), matchers...)
if err != nil {
return storage.ErrSeriesSet(err)
}
Expand All @@ -129,13 +134,8 @@ func (q *distributorQuerier) Select(_ bool, sp *storage.SelectHints, matchers ..
return series.MatrixToSeriesSet(matrix)
}

func (q *distributorQuerier) streamingSelect(ctx context.Context, minT, maxT int64, matchers []*labels.Matcher) storage.SeriesSet {
userID, err := tenant.TenantID(ctx)
if err != nil {
return storage.ErrSeriesSet(err)
}

results, err := q.distributor.QueryStream(ctx, model.Time(minT), model.Time(maxT), matchers...)
func (q *distributorQuerier) streamingSelect(ctx context.Context, userID string, minT, maxT int64, matchers []*labels.Matcher) storage.SeriesSet {
results, err := q.distributor.QueryStream(ctx, userID, model.Time(minT), model.Time(maxT), matchers...)
if err != nil {
return storage.ErrSeriesSet(err)
}
Expand Down
12 changes: 7 additions & 5 deletions pkg/querier/distributor_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

const (
maxt, mint = 0, 10
testUserID = "test"
)

func TestDistributorQuerier(t *testing.T) {
Expand All @@ -45,8 +46,9 @@ func TestDistributorQuerier(t *testing.T) {
},
nil)

ctx := user.InjectOrgID(context.Background(), testUserID)
queryable := newDistributorQueryable(d, false, nil, 0)
querier, err := queryable.Querier(context.Background(), mint, maxt)
querier, err := queryable.Querier(ctx, mint, maxt)
require.NoError(t, err)

seriesSet := querier.Select(true, &storage.SelectHints{Start: mint, End: maxt})
Expand Down Expand Up @@ -118,10 +120,10 @@ func TestDistributorQuerier_SelectShouldHonorQueryIngestersWithin(t *testing.T)
t.Run(fmt.Sprintf("%s (streaming enabled: %t)", testName, streamingEnabled), func(t *testing.T) {
distributor := &mockDistributor{}
distributor.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(model.Matrix{}, nil)
distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryStreamResponse{}, nil)
distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryStreamResponse{}, nil)
distributor.On("MetricsForLabelMatchers", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]metric.Metric{}, nil)

ctx := user.InjectOrgID(context.Background(), "test")
ctx := user.InjectOrgID(context.Background(), testUserID)
queryable := newDistributorQueryable(distributor, streamingEnabled, nil, testData.queryIngestersWithin)
querier, err := queryable.Querier(ctx, testData.queryMinT, testData.queryMaxT)
require.NoError(t, err)
Expand Down Expand Up @@ -330,11 +332,11 @@ type mockDistributor struct {
mock.Mock
}

func (m *mockDistributor) Query(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error) {
func (m *mockDistributor) Query(ctx context.Context, userID string, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error) {
args := m.Called(ctx, from, to, matchers)
return args.Get(0).(model.Matrix), args.Error(1)
}
func (m *mockDistributor) QueryStream(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (*client.QueryStreamResponse, error) {
func (m *mockDistributor) QueryStream(ctx context.Context, userID string, from, to model.Time, matchers ...*labels.Matcher) (*client.QueryStreamResponse, error) {
args := m.Called(ctx, from, to, matchers)
return args.Get(0).(*client.QueryStreamResponse), args.Error(1)
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -737,10 +737,10 @@ type errDistributor struct{}

var errDistributorError = fmt.Errorf("errDistributorError")

func (m *errDistributor) Query(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error) {
func (m *errDistributor) Query(ctx context.Context, userID string, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error) {
return nil, errDistributorError
}
func (m *errDistributor) QueryStream(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (*client.QueryStreamResponse, error) {
func (m *errDistributor) QueryStream(ctx context.Context, userID string, from, to model.Time, matchers ...*labels.Matcher) (*client.QueryStreamResponse, error) {
return nil, errDistributorError
}
func (m *errDistributor) LabelValuesForLabelName(context.Context, model.Time, model.Time, model.LabelName, ...*labels.Matcher) ([]string, error) {
Expand Down Expand Up @@ -777,11 +777,11 @@ func (c *emptyChunkStore) IsCalled() bool {

type emptyDistributor struct{}

func (d *emptyDistributor) Query(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error) {
func (d *emptyDistributor) Query(ctx context.Context, userID string, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error) {
return nil, nil
}

func (d *emptyDistributor) QueryStream(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (*client.QueryStreamResponse, error) {
func (d *emptyDistributor) QueryStream(ctx context.Context, userID string, from, to model.Time, matchers ...*labels.Matcher) (*client.QueryStreamResponse, error) {
return &client.QueryStreamResponse{}, nil
}

Expand Down