Skip to content

Commit

Permalink
Apply bytes limiter to LabelNames and LabelValuesForLabelNames (#6568)
Browse files Browse the repository at this point in the history
* Add bytes limiter to LabelValuesForLabelNames

Signed-off-by: Justin Jung <[email protected]>

* Add bytes limiter to LabelNames

Signed-off-by: Justin Jung <[email protected]>

* Fix bug in MetricsForLabelMatchersStream where bytes limiter overwrote err

Signed-off-by: Justin Jung <[email protected]>

* changelog

Signed-off-by: Justin Jung <[email protected]>

* Apply bytes limiter to LabelNames and LabelValues in block store queryable

Signed-off-by: Justin Jung <[email protected]>

---------

Signed-off-by: Justin Jung <[email protected]>
Signed-off-by: Ben Ye <[email protected]>
Co-authored-by: Ben Ye <[email protected]>
  • Loading branch information
justinjung04 and yeya24 authored Feb 3, 2025
1 parent 3d7aa9a commit e8b8f90
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## master / unreleased

* [FEATURE] Querier/Ruler: Add `query_partial_data` and `rules_partial_data` limits to allow queries/rules to be evaluated with data from a single zone, if other zones are not available. #6526
* [ENHANCEMENT] Querier: Apply bytes limiter to LabelNames and LabelValuesForLabelNames. #6568
* [ENHANCEMENT] Query Frontend: Add a `too_many_tenants` reason label value to `cortex_rejected_queries_total` metric to track the rejected query count due to the # of tenant limits. #6569
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517

Expand Down
43 changes: 29 additions & 14 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1188,7 +1188,7 @@ func (d *Distributor) ForReplicationSet(ctx context.Context, replicationSet ring
})
}

func (d *Distributor) LabelValuesForLabelNameCommon(ctx context.Context, from, to model.Time, labelName model.LabelName, hints *storage.LabelHints, f func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error), matchers ...*labels.Matcher) ([]string, error) {
func (d *Distributor) LabelValuesForLabelNameCommon(ctx context.Context, from, to model.Time, labelName model.LabelName, hints *storage.LabelHints, f func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest, limiter *limiter.QueryLimiter) ([]interface{}, error), matchers ...*labels.Matcher) ([]string, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "Distributor.LabelValues", opentracing.Tags{
"name": labelName,
"start": from.Unix(),
Expand All @@ -1205,7 +1205,8 @@ func (d *Distributor) LabelValuesForLabelNameCommon(ctx context.Context, from, t
return nil, err
}

resps, err := f(ctx, replicationSet, req)
queryLimiter := limiter.QueryLimiterFromContextWithFallback(ctx)
resps, err := f(ctx, replicationSet, req, queryLimiter)
if err != nil {
return nil, err
}
Expand All @@ -1229,20 +1230,23 @@ func (d *Distributor) LabelValuesForLabelNameCommon(ctx context.Context, from, t

// LabelValuesForLabelName returns all the label values that are associated with a given label name.
func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to model.Time, labelName model.LabelName, hint *storage.LabelHints, partialDataEnabled bool, matchers ...*labels.Matcher) ([]string, error) {
return d.LabelValuesForLabelNameCommon(ctx, from, to, labelName, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error) {
return d.LabelValuesForLabelNameCommon(ctx, from, to, labelName, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest, queryLimiter *limiter.QueryLimiter) ([]interface{}, error) {
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, partialDataEnabled, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
resp, err := client.LabelValues(ctx, req)
if err != nil {
return nil, err
}
if err := queryLimiter.AddDataBytes(resp.Size()); err != nil {
return nil, validation.LimitError(err.Error())
}
return resp.LabelValues, nil
})
}, matchers...)
}

// LabelValuesForLabelNameStream returns all the label values that are associated with a given label name.
func (d *Distributor) LabelValuesForLabelNameStream(ctx context.Context, from, to model.Time, labelName model.LabelName, hint *storage.LabelHints, partialDataEnabled bool, matchers ...*labels.Matcher) ([]string, error) {
return d.LabelValuesForLabelNameCommon(ctx, from, to, labelName, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error) {
return d.LabelValuesForLabelNameCommon(ctx, from, to, labelName, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest, queryLimiter *limiter.QueryLimiter) ([]interface{}, error) {
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, partialDataEnabled, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
stream, err := client.LabelValuesStream(ctx, req)
if err != nil {
Expand All @@ -1252,12 +1256,15 @@ func (d *Distributor) LabelValuesForLabelNameStream(ctx context.Context, from, t
allLabelValues := []string{}
for {
resp, err := stream.Recv()

if err == io.EOF {
break
} else if err != nil {
return nil, err
}
if err := queryLimiter.AddDataBytes(resp.Size()); err != nil {
return nil, validation.LimitError(err.Error())
}

allLabelValues = append(allLabelValues, resp.LabelValues...)
}

Expand All @@ -1266,7 +1273,7 @@ func (d *Distributor) LabelValuesForLabelNameStream(ctx context.Context, from, t
}, matchers...)
}

func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time, hints *storage.LabelHints, f func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error), matchers ...*labels.Matcher) ([]string, error) {
func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time, hints *storage.LabelHints, f func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest, limiter *limiter.QueryLimiter) ([]interface{}, error), matchers ...*labels.Matcher) ([]string, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "Distributor.LabelNames", opentracing.Tags{
"start": from.Unix(),
"end": to.Unix(),
Expand All @@ -1283,7 +1290,8 @@ func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time,
return nil, err
}

resps, err := f(ctx, replicationSet, req)
queryLimiter := limiter.QueryLimiterFromContextWithFallback(ctx)
resps, err := f(ctx, replicationSet, req, queryLimiter)
if err != nil {
return nil, err
}
Expand All @@ -1308,7 +1316,7 @@ func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time,
}

func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time, hints *storage.LabelHints, partialDataEnabled bool, matchers ...*labels.Matcher) ([]string, error) {
return d.LabelNamesCommon(ctx, from, to, hints, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error) {
return d.LabelNamesCommon(ctx, from, to, hints, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest, queryLimiter *limiter.QueryLimiter) ([]interface{}, error) {
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, partialDataEnabled, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
stream, err := client.LabelNamesStream(ctx, req)
if err != nil {
Expand All @@ -1318,12 +1326,15 @@ func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time,
allLabelNames := []string{}
for {
resp, err := stream.Recv()

if err == io.EOF {
break
} else if err != nil {
return nil, err
}
if err := queryLimiter.AddDataBytes(resp.Size()); err != nil {
return nil, validation.LimitError(err.Error())
}

allLabelNames = append(allLabelNames, resp.LabelNames...)
}

Expand All @@ -1334,12 +1345,16 @@ func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time,

// LabelNames returns all the label names.
func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time, hint *storage.LabelHints, partialDataEnabled bool, matchers ...*labels.Matcher) ([]string, error) {
return d.LabelNamesCommon(ctx, from, to, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error) {
return d.LabelNamesCommon(ctx, from, to, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest, queryLimiter *limiter.QueryLimiter) ([]interface{}, error) {
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, partialDataEnabled, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
resp, err := client.LabelNames(ctx, req)
if err != nil {
return nil, err
}
if err := queryLimiter.AddDataBytes(resp.Size()); err != nil {
return nil, validation.LimitError(err.Error())
}

return resp.LabelNames, nil
})
}, matchers...)
Expand Down Expand Up @@ -1385,15 +1400,15 @@ func (d *Distributor) MetricsForLabelMatchersStream(ctx context.Context, from, t
defer stream.CloseSend() //nolint:errcheck
for {
resp, err := stream.Recv()
if err := queryLimiter.AddDataBytes(resp.Size()); err != nil {
return nil, validation.LimitError(err.Error())
}

if err == io.EOF {
break
} else if err != nil {
return nil, err
}
if err := queryLimiter.AddDataBytes(resp.Size()); err != nil {
return nil, validation.LimitError(err.Error())
}

s := make([][]cortexpb.LabelAdapter, 0, len(resp.Metric))
for _, metric := range resp.Metric {
m := cortexpb.FromLabelAdaptersToMetricWithCopy(metric.Labels)
Expand Down
8 changes: 8 additions & 0 deletions pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -854,6 +854,7 @@ func (q *blocksStoreQuerier) fetchLabelNamesFromStore(
spanLog = spanlogger.FromContext(ctx)
merrMtx = sync.Mutex{}
merr = multierror.MultiError{}
queryLimiter = limiter.QueryLimiterFromContextWithFallback(ctx)
)

// Concurrently fetch series from all clients.
Expand Down Expand Up @@ -894,6 +895,9 @@ func (q *blocksStoreQuerier) fetchLabelNamesFromStore(
}
return errors.Wrapf(err, "failed to fetch label names from %s", c.RemoteAddress())
}
if dataBytesLimitErr := queryLimiter.AddDataBytes(namesResp.Size()); dataBytesLimitErr != nil {
return validation.LimitError(dataBytesLimitErr.Error())
}

myQueriedBlocks := []ulid.ULID(nil)
if namesResp.Hints != nil {
Expand Down Expand Up @@ -957,6 +961,7 @@ func (q *blocksStoreQuerier) fetchLabelValuesFromStore(
spanLog = spanlogger.FromContext(ctx)
merrMtx = sync.Mutex{}
merr = multierror.MultiError{}
queryLimiter = limiter.QueryLimiterFromContextWithFallback(ctx)
)

// Concurrently fetch series from all clients.
Expand Down Expand Up @@ -997,6 +1002,9 @@ func (q *blocksStoreQuerier) fetchLabelValuesFromStore(
}
return errors.Wrapf(err, "failed to fetch label values from %s", c.RemoteAddress())
}
if dataBytesLimitErr := queryLimiter.AddDataBytes(valuesResp.Size()); dataBytesLimitErr != nil {
return validation.LimitError(dataBytesLimitErr.Error())
}

myQueriedBlocks := []ulid.ULID(nil)
if valuesResp.Hints != nil {
Expand Down

0 comments on commit e8b8f90

Please sign in to comment.