Skip to content

Commit

Permalink
Make use of LabelHints.Limit for LabelNames and LabelValues reque…
Browse files Browse the repository at this point in the history
…sts (#8805) (#10410)

* Use LabelHints.Limit

WIP

Add limit field on LabelNamesRequest and LabelValuesRequest

Add support for limit on gateway.proto

Rename variables to keep consistency

Fix tests

Add tests

Add tests

Add tests

Add tests

Add changelog

Fix changelog

Fix tests

Fix imports

empty commit

Update pkg/distributor/distributor.go

Co-authored-by: Arve Knudsen <[email protected]>

Update pkg/distributor/distributor_test.go

Co-authored-by: Arve Knudsen <[email protected]>

Update pkg/distributor/distributor_test.go

Co-authored-by: Arve Knudsen <[email protected]>

Update pkg/distributor/distributor_test.go

Co-authored-by: Arve Knudsen <[email protected]>

Update pkg/distributor/distributor_test.go

Co-authored-by: Arve Knudsen <[email protected]>

Update pkg/querier/tenantfederation/merge_queryable_test.go

Co-authored-by: Arve Knudsen <[email protected]>

Update pkg/distributor/distributor_test.go

Co-authored-by: Arve Knudsen <[email protected]>

Update pkg/querier/tenantfederation/merge_queryable_test.go

Co-authored-by: Arve Knudsen <[email protected]>

Update pkg/storegateway/bucket.go

Co-authored-by: Arve Knudsen <[email protected]>

Update pkg/storegateway/bucket.go

Co-authored-by: Arve Knudsen <[email protected]>

Update CHANGELOG.md

Co-authored-by: Arve Knudsen <[email protected]>

Update pkg/distributor/distributor.go

Co-authored-by: Arve Knudsen <[email protected]>

Update pkg/distributor/distributor_test.go

Co-authored-by: Arve Knudsen <[email protected]>

Update pkg/distributor/distributor_test.go

Co-authored-by: Arve Knudsen <[email protected]>

Update pkg/distributor/distributor_test.go

Co-authored-by: Arve Knudsen <[email protected]>

Update pkg/ingester/client/compat.go

Co-authored-by: Arve Knudsen <[email protected]>

Update pkg/ingester/client/compat.go

Co-authored-by: Arve Knudsen <[email protected]>

Apply suggestions from code review

Co-authored-by: Arve Knudsen <[email protected]>

Apply suggestions from code review

Co-authored-by: Arve Knudsen <[email protected]>

Fix pipeline

Fix tests

empty commit

* Rebase

* Apply suggestions from code review

Co-authored-by: Arve Knudsen <[email protected]>

* Addressed feedback PR

* Addressed feedback PR

* Addressed feedback PR from charleskorn

* Fix test

* Update CHANGELOG.md

---------

Co-authored-by: Arve Knudsen <[email protected]>
Co-authored-by: Charles Korn <[email protected]>
  • Loading branch information
3 people authored Feb 3, 2025
1 parent ae924bf commit ac097ac
Show file tree
Hide file tree
Showing 18 changed files with 667 additions and 233 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
* `cortex_ingester_tsdb_block_postings_for_matchers_cache_skips_total`
* `cortex_ingester_tsdb_block_postings_for_matchers_cache_evictions_total`
* [ENHANCEMENT] Compactor: Shuffle users' order in `BlocksCleaner`. Prevents bucket indexes from going an extended period without cleanup during compactor restarts. #10513
* [ENHANCEMENT] Distributor, querier, ingester and store-gateway: Add support for `limit` parameter for label names and values requests. #10410
* [BUGFIX] Distributor: Use a boolean to track changes while merging the ReplicaDesc components, rather than comparing the objects directly. #10185
* [BUGFIX] Querier: fix timeout responding to query-frontend when response size is very close to `-querier.frontend-client.grpc-max-send-msg-size`. #10154
* [BUGFIX] Query-frontend and querier: show warning/info annotations in some cases where they were missing (if a lazy querier was used). #10277
Expand Down
17 changes: 13 additions & 4 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/storage"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -1971,13 +1972,13 @@ func queryIngesterPartitionsRingZoneSorter(preferredZone string) ring.ZoneSorter

// LabelValuesForLabelName returns the label values associated with the given labelName, among all series with samples
// timestamp between from and to, and series labels matching the optional matchers.
func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to model.Time, labelName model.LabelName, matchers ...*labels.Matcher) ([]string, error) {
func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to model.Time, labelName model.LabelName, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, error) {
replicationSets, err := d.getIngesterReplicationSetsForQuery(ctx)
if err != nil {
return nil, err
}

req, err := ingester_client.ToLabelValuesRequest(labelName, from, to, matchers)
req, err := ingester_client.ToLabelValuesRequest(labelName, from, to, hints, matchers)
if err != nil {
return nil, err
}
Expand All @@ -2004,6 +2005,10 @@ func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to mode
// We need the values returned to be sorted.
slices.Sort(values)

if hints != nil && hints.Limit > 0 && len(values) > hints.Limit {
values = values[:hints.Limit]
}

return values, nil
}

Expand Down Expand Up @@ -2693,13 +2698,13 @@ func maxFromZones[T ~float64 | ~uint64](seriesCountByZone map[string]T) (val T)

// LabelNames returns the names of all labels from series with samples timestamp between from and to, and matching
// the input optional series label matchers. The returned label names are sorted.
func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) ([]string, error) {
func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, error) {
replicationSets, err := d.getIngesterReplicationSetsForQuery(ctx)
if err != nil {
return nil, err
}

req, err := ingester_client.ToLabelNamesRequest(from, to, matchers)
req, err := ingester_client.ToLabelNamesRequest(from, to, hints, matchers)
if err != nil {
return nil, err
}
Expand All @@ -2725,6 +2730,10 @@ func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time, match

slices.Sort(values)

if hints != nil && hints.Limit > 0 && len(values) > hints.Limit {
values = values[:hints.Limit]
}

return values, nil
}

Expand Down
99 changes: 95 additions & 4 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/storage"
promtestutil "github.com/prometheus/prometheus/util/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -3290,6 +3291,7 @@ func TestDistributor_LabelNames(t *testing.T) {

tests := map[string]struct {
shuffleShardSize int
hints *storage.LabelHints
matchers []*labels.Matcher
expectedResult []string
expectedIngesters int
Expand All @@ -3308,6 +3310,38 @@ func TestDistributor_LabelNames(t *testing.T) {
expectedResult: []string{labels.MetricName, "reason", "status"},
expectedIngesters: numIngesters,
},
"should filter metrics by single matcher and apply limit": {
hints: &storage.LabelHints{Limit: 2},
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_1"),
},
expectedResult: []string{labels.MetricName, "reason"},
expectedIngesters: numIngesters,
},
"should filter metrics by single matcher and ignore limit when it is zero": {
hints: &storage.LabelHints{Limit: 0},
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_1"),
},
expectedResult: []string{labels.MetricName, "reason", "status"},
expectedIngesters: numIngesters,
},
"should filter metrics by single matcher and ignore limit when it is equal than the number of items returned": {
hints: &storage.LabelHints{Limit: 3},
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_1"),
},
expectedResult: []string{labels.MetricName, "reason", "status"},
expectedIngesters: numIngesters,
},
"should filter metrics by single matcher and ignore limit when it is greater than the number of items returned": {
hints: &storage.LabelHints{Limit: 5},
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_1"),
},
expectedResult: []string{labels.MetricName, "reason", "status"},
expectedIngesters: numIngesters,
},
"should filter metrics by multiple matchers": {
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, "status", "200"),
Expand All @@ -3316,6 +3350,15 @@ func TestDistributor_LabelNames(t *testing.T) {
expectedResult: []string{labels.MetricName, "status"},
expectedIngesters: numIngesters,
},
"should filter metrics by multiple matchers and apply limit": {
hints: &storage.LabelHints{Limit: 1},
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, "status", "200"),
mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_1"),
},
expectedResult: []string{labels.MetricName},
expectedIngesters: numIngesters,
},
"should query only ingesters belonging to tenant's subring if shuffle sharding is enabled": {
shuffleShardSize: 3,
matchers: []*labels.Matcher{
Expand All @@ -3324,6 +3367,15 @@ func TestDistributor_LabelNames(t *testing.T) {
expectedResult: []string{labels.MetricName, "reason", "status"},
expectedIngesters: 3,
},
"should query only ingesters belonging to tenant's subring if shuffle sharding is enabled and apply limit": {
shuffleShardSize: 3,
hints: &storage.LabelHints{Limit: 1},
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_1"),
},
expectedResult: []string{labels.MetricName},
expectedIngesters: 3,
},
}

for testName, testData := range tests {
Expand Down Expand Up @@ -3365,7 +3417,7 @@ func TestDistributor_LabelNames(t *testing.T) {
require.NoError(t, err)
}

names, err := ds[0].LabelNames(ctx, now, now, testData.matchers...)
names, err := ds[0].LabelNames(ctx, now, now, testData.hints, testData.matchers...)
require.NoError(t, err)
assert.ElementsMatch(t, testData.expectedResult, names)

Expand Down Expand Up @@ -3551,6 +3603,7 @@ func TestDistributor_LabelValuesForLabelName(t *testing.T) {
tests := map[string]struct {
from, to model.Time
expectedLabelValues []string
hints *storage.LabelHints
matchers []*labels.Matcher
}{
"all time selected, no matchers": {
Expand All @@ -3569,6 +3622,30 @@ func TestDistributor_LabelValuesForLabelName(t *testing.T) {
expectedLabelValues: []string{"label_1"},
matchers: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "reason", "broken")},
},
"all time selected, no matchers, hints provided without limit": {
from: 0,
to: 300_000,
hints: &storage.LabelHints{Limit: 0},
expectedLabelValues: []string{"label_0", "label_1"},
},
"all time selected, no matchers, limit provided": {
from: 0,
to: 300_000,
hints: &storage.LabelHints{Limit: 1},
expectedLabelValues: []string{"label_0"},
},
"all time selected, no matchers, limit equal to number of label values": {
from: 0,
to: 300_000,
hints: &storage.LabelHints{Limit: 2},
expectedLabelValues: []string{"label_0", "label_1"},
},
"all time selected, no matchers, limit greater than number of label values": {
from: 0,
to: 300_000,
hints: &storage.LabelHints{Limit: 4},
expectedLabelValues: []string{"label_0", "label_1"},
},
}

for testName, testCase := range tests {
Expand Down Expand Up @@ -3599,7 +3676,7 @@ func TestDistributor_LabelValuesForLabelName(t *testing.T) {
require.NoError(t, err)
}

response, err := ds[0].LabelValuesForLabelName(ctx, testCase.from, testCase.to, labels.MetricName, testCase.matchers...)
response, err := ds[0].LabelValuesForLabelName(ctx, testCase.from, testCase.to, labels.MetricName, testCase.hints, testCase.matchers...)
require.NoError(t, err)
assert.ElementsMatch(t, response, testCase.expectedLabelValues)
})
Expand Down Expand Up @@ -6447,7 +6524,7 @@ func (i *mockIngester) LabelValues(ctx context.Context, req *client.LabelValuesR
return nil, errFail
}

labelName, from, to, matchers, err := client.FromLabelValuesRequest(req)
labelName, from, to, hints, matchers, err := client.FromLabelValuesRequest(req)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -6481,6 +6558,9 @@ func (i *mockIngester) LabelValues(ctx context.Context, req *client.LabelValuesR

slices.Sort(response)

if hints != nil && hints.Limit > 0 && len(response) > hints.Limit {
response = response[:hints.Limit]
}
return &client.LabelValuesResponse{LabelValues: response}, nil
}

Expand All @@ -6498,21 +6578,32 @@ func (i *mockIngester) LabelNames(ctx context.Context, req *client.LabelNamesReq
return nil, errFail
}

_, _, matchers, err := client.FromLabelNamesRequest(req)
_, _, hints, matchers, err := client.FromLabelNamesRequest(req)
if err != nil {
return nil, err
}

response := client.LabelNamesResponse{}
labelsSet := map[string]struct{}{}

for _, ts := range i.timeseries {
if match(ts.Labels, matchers) {
for _, lbl := range ts.Labels {
if _, ok := labelsSet[lbl.Name]; ok {
continue
}

labelsSet[lbl.Name] = struct{}{}
response.LabelNames = append(response.LabelNames, lbl.Name)
}
}
}
slices.Sort(response.LabelNames)

if hints != nil && hints.Limit > 0 && len(response.LabelNames) > hints.Limit {
response.LabelNames = response.LabelNames[:hints.Limit]
}

return &response, nil
}

Expand Down
39 changes: 30 additions & 9 deletions pkg/ingester/client/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"

"github.com/grafana/mimir/pkg/mimirpb"
)
Expand Down Expand Up @@ -108,62 +109,82 @@ func FromMetricsForLabelMatchersResponse(resp *MetricsForLabelMatchersResponse)
}

// ToLabelValuesRequest builds a LabelValuesRequest proto
func ToLabelValuesRequest(labelName model.LabelName, from, to model.Time, matchers []*labels.Matcher) (*LabelValuesRequest, error) {
func ToLabelValuesRequest(labelName model.LabelName, from, to model.Time, hints *storage.LabelHints, matchers []*labels.Matcher) (*LabelValuesRequest, error) {
ms, err := ToLabelMatchers(matchers)
if err != nil {
return nil, err
}

var limit int64
if hints != nil && hints.Limit > 0 {
limit = int64(hints.Limit)
}
return &LabelValuesRequest{
LabelName: string(labelName),
StartTimestampMs: int64(from),
EndTimestampMs: int64(to),
Matchers: &LabelMatchers{Matchers: ms},
Limit: limit,
}, nil
}

// FromLabelValuesRequest unpacks a LabelValuesRequest proto
func FromLabelValuesRequest(req *LabelValuesRequest) (string, int64, int64, []*labels.Matcher, error) {
func FromLabelValuesRequest(req *LabelValuesRequest) (string, int64, int64, *storage.LabelHints, []*labels.Matcher, error) {
var err error
var hints *storage.LabelHints
var matchers []*labels.Matcher

if req.Matchers != nil {
matchers, err = FromLabelMatchers(req.Matchers.Matchers)
if err != nil {
return "", 0, 0, nil, err
return "", 0, 0, nil, nil, err
}
}

return req.LabelName, req.StartTimestampMs, req.EndTimestampMs, matchers, nil
if req.Limit > 0 {
hints = &storage.LabelHints{Limit: int(req.Limit)}
}

return req.LabelName, req.StartTimestampMs, req.EndTimestampMs, hints, matchers, nil
}

// ToLabelNamesRequest builds a LabelNamesRequest proto
func ToLabelNamesRequest(from, to model.Time, matchers []*labels.Matcher) (*LabelNamesRequest, error) {
func ToLabelNamesRequest(from, to model.Time, hints *storage.LabelHints, matchers []*labels.Matcher) (*LabelNamesRequest, error) {
ms, err := ToLabelMatchers(matchers)
if err != nil {
return nil, err
}

var limit int64
if hints != nil && hints.Limit > 0 {
limit = int64(hints.Limit)
}

return &LabelNamesRequest{
StartTimestampMs: int64(from),
EndTimestampMs: int64(to),
Matchers: &LabelMatchers{Matchers: ms},
Limit: limit,
}, nil
}

// FromLabelNamesRequest unpacks a LabelNamesRequest proto
func FromLabelNamesRequest(req *LabelNamesRequest) (int64, int64, []*labels.Matcher, error) {
func FromLabelNamesRequest(req *LabelNamesRequest) (int64, int64, *storage.LabelHints, []*labels.Matcher, error) {
var err error
var hints *storage.LabelHints
var matchers []*labels.Matcher

if req.Matchers != nil {
matchers, err = FromLabelMatchers(req.Matchers.Matchers)
if err != nil {
return 0, 0, nil, err
return 0, 0, nil, nil, err
}
}

return req.StartTimestampMs, req.EndTimestampMs, matchers, nil
if req.Limit != 0 {
hints = &storage.LabelHints{Limit: int(req.Limit)}
}

return req.StartTimestampMs, req.EndTimestampMs, hints, matchers, nil
}

func ToActiveSeriesRequest(matchers []*labels.Matcher) (*ActiveSeriesRequest, error) {
Expand Down
7 changes: 5 additions & 2 deletions pkg/ingester/client/compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -73,15 +74,17 @@ func TestLabelNamesRequest(t *testing.T) {
mint, maxt = 0, 10
)

hints := &storage.LabelHints{Limit: 10}
matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}

req, err := ToLabelNamesRequest(mint, maxt, matchers)
req, err := ToLabelNamesRequest(mint, maxt, hints, matchers)
require.NoError(t, err)

actualMinT, actualMaxT, actualMatchers, err := FromLabelNamesRequest(req)
actualMinT, actualMaxT, actualHints, actualMatchers, err := FromLabelNamesRequest(req)
require.NoError(t, err)

assert.Equal(t, int64(mint), actualMinT)
assert.Equal(t, int64(maxt), actualMaxT)
assert.Equal(t, hints, actualHints)
assert.Equal(t, matchers, actualMatchers)
}
Loading

0 comments on commit ac097ac

Please sign in to comment.