Skip to content

Commit 9b7a634

Browse files
[release-2.9.x] Return intersecting labels when aggregating volume by label (#10329)
Backport 4d2a94e from #10248 --- This PR changes the behavior the `index/volume` endpoint to return intersecting labels when requesting volume aggregating by label. Previously it only returned labels that matched selectors in the query. The old behavior is still possible using the `targetLabels` argument. Co-authored-by: Trevor Whitney <[email protected]>
1 parent 1761625 commit 9b7a634

File tree

4 files changed

+70
-33
lines changed

4 files changed

+70
-33
lines changed

pkg/ingester/instance.go

+7-2
Original file line numberDiff line numberDiff line change
@@ -633,7 +633,8 @@ func (i *instance) GetVolume(ctx context.Context, req *logproto.VolumeRequest) (
633633
return nil, err
634634
}
635635

636-
labelsToMatch, matchers, matchAny := util.PrepareLabelsAndMatchers(req.TargetLabels, matchers)
636+
targetLabels := req.TargetLabels
637+
labelsToMatch, matchers, matchAny := util.PrepareLabelsAndMatchers(targetLabels, matchers)
637638
matchAny = matchAny || len(matchers) == 0
638639

639640
seriesNames := make(map[uint64]string)
@@ -673,7 +674,11 @@ func (i *instance) GetVolume(ctx context.Context, req *logproto.VolumeRequest) (
673674
} else {
674675
labelVolumes = make(map[string]uint64, len(s.labels))
675676
for _, l := range s.labels {
676-
if _, ok := labelsToMatch[l.Name]; matchAny || ok {
677+
if len(targetLabels) > 0 {
678+
if _, ok := labelsToMatch[l.Name]; matchAny || ok {
679+
labelVolumes[l.Name] += size
680+
}
681+
} else {
677682
labelVolumes[l.Name] += size
678683
}
679684
}

pkg/ingester/instance_test.go

+36-22
Original file line numberDiff line numberDiff line change
@@ -845,11 +845,17 @@ func TestInstance_Volume(t *testing.T) {
845845
err := instance.Push(context.TODO(), &logproto.PushRequest{
846846
Streams: []logproto.Stream{
847847
{
848-
Labels: `{host="other"}`,
848+
Labels: `{fizz="buzz", host="other"}`,
849849
Entries: []logproto.Entry{
850850
{Timestamp: time.Unix(0, 1e6), Line: `msg="other"`},
851851
},
852852
},
853+
{
854+
Labels: `{foo="bar", host="other", log_stream="worker"}`,
855+
Entries: []logproto.Entry{
856+
{Timestamp: time.Unix(0, 1e6), Line: `msg="other worker"`},
857+
},
858+
},
853859
},
854860
})
855861
require.NoError(t, err)
@@ -864,15 +870,16 @@ func TestInstance_Volume(t *testing.T) {
864870
From: 0,
865871
Through: 1.1 * 1e3, //milliseconds
866872
Matchers: "{}",
867-
Limit: 3,
873+
Limit: 5,
868874
AggregateBy: seriesvolume.Series,
869875
})
870876
require.NoError(t, err)
871877

872878
require.Equal(t, []logproto.Volume{
873879
{Name: `{host="agent", job="3", log_stream="dispatcher"}`, Volume: 90},
874880
{Name: `{host="agent", job="3", log_stream="worker"}`, Volume: 70},
875-
{Name: `{host="other"}`, Volume: 11},
881+
{Name: `{foo="bar", host="other", log_stream="worker"}`, Volume: 18},
882+
{Name: `{fizz="buzz", host="other"}`, Volume: 11},
876883
}, volumes.Volumes)
877884
})
878885

@@ -882,7 +889,7 @@ func TestInstance_Volume(t *testing.T) {
882889
From: 0,
883890
Through: 1.1 * 1e3, //milliseconds
884891
Matchers: `{log_stream="dispatcher"}`,
885-
Limit: 3,
892+
Limit: 5,
886893
AggregateBy: seriesvolume.Series,
887894
})
888895
require.NoError(t, err)
@@ -898,7 +905,7 @@ func TestInstance_Volume(t *testing.T) {
898905
From: 5,
899906
Through: 1.1 * 1e3, //milliseconds
900907
Matchers: "{}",
901-
Limit: 3,
908+
Limit: 5,
902909
AggregateBy: seriesvolume.Series,
903910
})
904911
require.NoError(t, err)
@@ -932,15 +939,15 @@ func TestInstance_Volume(t *testing.T) {
932939
From: 0,
933940
Through: 1.1 * 1e3, //milliseconds
934941
Matchers: `{}`,
935-
Limit: 3,
942+
Limit: 5,
936943
TargetLabels: []string{"log_stream"},
937944
AggregateBy: seriesvolume.Series,
938945
})
939946
require.NoError(t, err)
940947

941948
require.Equal(t, []logproto.Volume{
942949
{Name: `{log_stream="dispatcher"}`, Volume: 90},
943-
{Name: `{log_stream="worker"}`, Volume: 70},
950+
{Name: `{log_stream="worker"}`, Volume: 88},
944951
}, volumes.Volumes)
945952
})
946953

@@ -950,7 +957,7 @@ func TestInstance_Volume(t *testing.T) {
950957
From: 0,
951958
Through: 1.1 * 1e3, //milliseconds
952959
Matchers: `{log_stream="dispatcher"}`,
953-
Limit: 3,
960+
Limit: 5,
954961
TargetLabels: []string{"host"},
955962
AggregateBy: seriesvolume.Series,
956963
})
@@ -967,7 +974,7 @@ func TestInstance_Volume(t *testing.T) {
967974
From: 0,
968975
Through: 1.1 * 1e3, //milliseconds
969976
Matchers: `{log_stream=~".+"}`,
970-
Limit: 3,
977+
Limit: 5,
971978
TargetLabels: []string{"host", "job"},
972979
AggregateBy: seriesvolume.Series,
973980
})
@@ -987,32 +994,39 @@ func TestInstance_Volume(t *testing.T) {
987994
From: 0,
988995
Through: 1.1 * 1e3, //milliseconds
989996
Matchers: "{}",
990-
Limit: 3,
997+
Limit: 5,
991998
AggregateBy: seriesvolume.Labels,
992999
})
9931000
require.NoError(t, err)
9941001

9951002
require.Equal(t, []logproto.Volume{
996-
{Name: `host`, Volume: 171},
1003+
{Name: `host`, Volume: 189},
1004+
{Name: `log_stream`, Volume: 178},
9971005
{Name: `job`, Volume: 160},
998-
{Name: `log_stream`, Volume: 160},
1006+
{Name: `foo`, Volume: 18},
1007+
{Name: `fizz`, Volume: 11},
9991008
}, volumes.Volumes)
10001009
})
10011010

1002-
t.Run("with matchers", func(t *testing.T) {
1011+
t.Run("with matchers it returns intersecting labels", func(t *testing.T) {
10031012
instance := prepareInstance(t)
10041013
volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{
10051014
From: 0,
10061015
Through: 1.1 * 1e3, //milliseconds
1007-
Matchers: `{log_stream="dispatcher"}`,
1008-
Limit: 3,
1016+
Matchers: `{log_stream="worker"}`,
1017+
Limit: 5,
10091018
AggregateBy: seriesvolume.Labels,
10101019
})
10111020
require.NoError(t, err)
10121021

10131022
require.Equal(t, []logproto.Volume{
1014-
{Name: `log_stream`, Volume: 90},
1023+
{Name: `host`, Volume: 88},
1024+
{Name: `log_stream`, Volume: 88},
1025+
{Name: `job`, Volume: 70},
1026+
{Name: `foo`, Volume: 18},
10151027
}, volumes.Volumes)
1028+
1029+
require.NotContains(t, volumes.Volumes, logproto.Volume{Name: `fizz`, Volume: 11})
10161030
})
10171031

10181032
t.Run("excludes streams outside of time bounds", func(t *testing.T) {
@@ -1021,7 +1035,7 @@ func TestInstance_Volume(t *testing.T) {
10211035
From: 5,
10221036
Through: 1.1 * 1e3, //milliseconds
10231037
Matchers: "{}",
1024-
Limit: 3,
1038+
Limit: 5,
10251039
AggregateBy: seriesvolume.Labels,
10261040
})
10271041
require.NoError(t, err)
@@ -1045,7 +1059,7 @@ func TestInstance_Volume(t *testing.T) {
10451059
require.NoError(t, err)
10461060

10471061
require.Equal(t, []logproto.Volume{
1048-
{Name: `host`, Volume: 171},
1062+
{Name: `host`, Volume: 189},
10491063
}, volumes.Volumes)
10501064
})
10511065

@@ -1056,14 +1070,14 @@ func TestInstance_Volume(t *testing.T) {
10561070
From: 0,
10571071
Through: 1.1 * 1e3, //milliseconds
10581072
Matchers: `{}`,
1059-
Limit: 3,
1073+
Limit: 5,
10601074
TargetLabels: []string{"host"},
10611075
AggregateBy: seriesvolume.Labels,
10621076
})
10631077
require.NoError(t, err)
10641078

10651079
require.Equal(t, []logproto.Volume{
1066-
{Name: `host`, Volume: 171},
1080+
{Name: `host`, Volume: 189},
10671081
}, volumes.Volumes)
10681082
})
10691083

@@ -1073,7 +1087,7 @@ func TestInstance_Volume(t *testing.T) {
10731087
From: 0,
10741088
Through: 1.1 * 1e3, //milliseconds
10751089
Matchers: `{log_stream="dispatcher"}`,
1076-
Limit: 3,
1090+
Limit: 5,
10771091
TargetLabels: []string{"host"},
10781092
AggregateBy: seriesvolume.Labels,
10791093
})
@@ -1090,7 +1104,7 @@ func TestInstance_Volume(t *testing.T) {
10901104
From: 0,
10911105
Through: 1.1 * 1e3, //milliseconds
10921106
Matchers: `{log_stream=~".+"}`,
1093-
Limit: 3,
1107+
Limit: 5,
10941108
TargetLabels: []string{"host", "job"},
10951109
AggregateBy: seriesvolume.Labels,
10961110
})

pkg/storage/stores/tsdb/single_file_index.go

+10-2
Original file line numberDiff line numberDiff line change
@@ -409,10 +409,18 @@ func (i *TSDBIndex) Volume(
409409
}
410410
}
411411
} else {
412+
// when aggregating by labels, capture sizes for target labels if provided,
413+
// otherwise for all intersecting labels
412414
labelVolumes = make(map[string]uint64, len(ls))
413415
for _, l := range ls {
414-
if _, ok := labelsToMatch[l.Name]; l.Name != TenantLabel && includeAll || ok {
415-
labelVolumes[l.Name] += stats.KB << 10
416+
if len(targetLabels) > 0 {
417+
if _, ok := labelsToMatch[l.Name]; l.Name != TenantLabel && includeAll || ok {
418+
labelVolumes[l.Name] += stats.KB << 10
419+
}
420+
} else {
421+
if l.Name != TenantLabel {
422+
labelVolumes[l.Name] += stats.KB << 10
423+
}
416424
}
417425
}
418426
}

pkg/storage/stores/tsdb/single_file_index_test.go

+17-7
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,7 @@ func TestTSDBIndex_Volume(t *testing.T) {
376376

377377
series := []LoadableSeries{
378378
{
379-
Labels: mustParseLabels(`{foo="bar", fizz="buzz", __loki_tenant__="fake"}`),
379+
Labels: mustParseLabels(`{foo="bar", fizz="buzz", us="them", __loki_tenant__="fake"}`),
380380

381381
Chunks: []index.ChunkMeta{
382382
{
@@ -396,7 +396,7 @@ func TestTSDBIndex_Volume(t *testing.T) {
396396
},
397397
},
398398
{
399-
Labels: mustParseLabels(`{foo="bar", fizz="fizz", __loki_tenant__="fake"}`),
399+
Labels: mustParseLabels(`{foo="bar", fizz="fizz", in="out", __loki_tenant__="fake"}`),
400400
Chunks: []index.ChunkMeta{
401401
{
402402
MinTime: t1.UnixMilli(),
@@ -451,8 +451,8 @@ func TestTSDBIndex_Volume(t *testing.T) {
451451
require.Equal(t, &logproto.VolumeResponse{
452452
Volumes: []logproto.Volume{
453453
{Name: `{foo="baz"}`, Volume: (50 + 60) * 1024},
454-
{Name: `{fizz="fizz", foo="bar"}`, Volume: (30 + 40) * 1024},
455-
{Name: `{fizz="buzz", foo="bar"}`, Volume: (10 + 20) * 1024},
454+
{Name: `{fizz="fizz", foo="bar", in="out"}`, Volume: (30 + 40) * 1024},
455+
{Name: `{fizz="buzz", foo="bar", us="them"}`, Volume: (10 + 20) * 1024},
456456
},
457457
Limit: 10,
458458
}, acc.Volumes())
@@ -602,6 +602,8 @@ func TestTSDBIndex_Volume(t *testing.T) {
602602
Volumes: []logproto.Volume{
603603
{Name: `foo`, Volume: (10 + 20 + 30 + 40 + 50 + 60) * 1024},
604604
{Name: `fizz`, Volume: (10 + 20 + 30 + 40) * 1024},
605+
{Name: `in`, Volume: (30 + 40) * 1024},
606+
{Name: `us`, Volume: (10 + 20) * 1024},
605607
},
606608
Limit: 10,
607609
}, acc.Volumes())
@@ -619,6 +621,8 @@ func TestTSDBIndex_Volume(t *testing.T) {
619621
Volumes: []logproto.Volume{
620622
{Name: `fizz`, Volume: (10 + 20 + 30 + 40) * 1024},
621623
{Name: `foo`, Volume: (10 + 20 + 30 + 40) * 1024},
624+
{Name: `in`, Volume: (30 + 40) * 1024},
625+
{Name: `us`, Volume: (10 + 20) * 1024},
622626
},
623627
Limit: 10,
624628
}, acc.Volumes())
@@ -635,14 +639,16 @@ func TestTSDBIndex_Volume(t *testing.T) {
635639
}, acc.Volumes())
636640
})
637641

638-
t.Run("it only returns results for the labels in the matcher", func(t *testing.T) {
639-
matcher := labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")
642+
t.Run("it only returns labels that exist on series intersecting with the matcher ", func(t *testing.T) {
643+
matcher := labels.MustNewMatcher(labels.MatchEqual, "us", "them")
640644
acc := seriesvolume.NewAccumulator(10, 10)
641645
err := tsdbIndex.Volume(context.Background(), "fake", from, through, acc, nil, nil, nil, seriesvolume.Labels, matcher)
642646
require.NoError(t, err)
643647
require.Equal(t, &logproto.VolumeResponse{
644648
Volumes: []logproto.Volume{
645-
{Name: `foo`, Volume: (10 + 20 + 30 + 40) * 1024},
649+
{Name: `fizz`, Volume: (10 + 20) * 1024},
650+
{Name: `foo`, Volume: (10 + 20) * 1024},
651+
{Name: `us`, Volume: (10 + 20) * 1024},
646652
},
647653
Limit: 10,
648654
}, acc.Volumes())
@@ -660,6 +666,8 @@ func TestTSDBIndex_Volume(t *testing.T) {
660666
Volumes: []logproto.Volume{
661667
{Name: `fizz`, Volume: (10 + 20 + 30 + 40) * 1024},
662668
{Name: `foo`, Volume: (10 + 20 + 30 + 40) * 1024},
669+
{Name: `in`, Volume: (30 + 40) * 1024},
670+
{Name: `us`, Volume: (10 + 20) * 1024},
663671
},
664672
Limit: 10,
665673
}, acc.Volumes())
@@ -689,6 +697,8 @@ func TestTSDBIndex_Volume(t *testing.T) {
689697
Volumes: []logproto.Volume{
690698
{Name: `foo`, Volume: (29 + 9 + 48) * 1024},
691699
{Name: `fizz`, Volume: (29 + 9) * 1024},
700+
{Name: `in`, Volume: (29) * 1024},
701+
{Name: `us`, Volume: (9) * 1024},
692702
},
693703
Limit: 10,
694704
}, acc.Volumes())

0 commit comments

Comments
 (0)