diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index e3891215eb..aa686d5968 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -447,6 +447,7 @@ func runStore( }), store.WithLazyExpandedPostings(conf.lazyExpandedPostingsEnabled), store.WithPostingGroupMaxKeySeriesRatio(conf.postingGroupMaxKeySeriesRatio), + store.WithSeriesMatchRatio(0.5), // TODO: expose series match ratio as config. store.WithIndexHeaderLazyDownloadStrategy( indexheader.IndexHeaderLazyDownloadStrategy(conf.indexHeaderLazyDownloadStrategy).StrategyToDownloadFunc(), ), diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 16e9e8c39d..6e5a656e62 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -442,6 +442,7 @@ type BucketStore struct { enableChunkHashCalculation bool enabledLazyExpandedPostings bool + seriesMatchRatio float64 postingGroupMaxKeySeriesRatio float64 sortingStrategy sortingStrategy @@ -591,6 +592,15 @@ func WithPostingGroupMaxKeySeriesRatio(postingGroupMaxKeySeriesRatio float64) Bu } } +// WithSeriesMatchRatio configures how many series would match when intersecting posting groups. +// This is used for lazy posting optimization strategy. Ratio should be within (0, 1). +// The closer to 1, it means matchers have bad selectivity. +func WithSeriesMatchRatio(seriesMatchRatio float64) BucketStoreOption { + return func(s *BucketStore) { + s.seriesMatchRatio = seriesMatchRatio + } +} + // WithDontResort disables series resorting in Store Gateway. func WithDontResort(true bool) BucketStoreOption { return func(s *BucketStore) { @@ -1065,6 +1075,7 @@ type blockSeriesClient struct { bytesLimiter BytesLimiter lazyExpandedPostingEnabled bool + seriesMatchRatio float64 // Mark posting group as lazy if it adds too many keys. 0 to disable. postingGroupMaxKeySeriesRatio float64 lazyExpandedPostingsCount prometheus.Counter @@ -1111,6 +1122,7 @@ func newBlockSeriesClient( chunkFetchDurationSum *prometheus.HistogramVec, extLsetToRemove map[string]struct{}, lazyExpandedPostingEnabled bool, + seriesMatchRatio float64, postingGroupMaxKeySeriesRatio float64, lazyExpandedPostingsCount prometheus.Counter, lazyExpandedPostingByReason *prometheus.CounterVec, @@ -1148,6 +1160,7 @@ func newBlockSeriesClient( chunkFetchDurationSum: chunkFetchDurationSum, lazyExpandedPostingEnabled: lazyExpandedPostingEnabled, + seriesMatchRatio: seriesMatchRatio, postingGroupMaxKeySeriesRatio: postingGroupMaxKeySeriesRatio, lazyExpandedPostingsCount: lazyExpandedPostingsCount, lazyExpandedPostingGroupByReason: lazyExpandedPostingByReason, @@ -1202,7 +1215,7 @@ func (b *blockSeriesClient) ExpandPostings( matchers sortedMatchers, seriesLimiter SeriesLimiter, ) error { - ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter, b.lazyExpandedPostingEnabled, b.postingGroupMaxKeySeriesRatio, b.lazyExpandedPostingSizeBytes, b.lazyExpandedPostingGroupByReason, b.tenant) + ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter, b.lazyExpandedPostingEnabled, b.seriesMatchRatio, b.postingGroupMaxKeySeriesRatio, b.lazyExpandedPostingSizeBytes, b.lazyExpandedPostingGroupByReason, b.tenant) if err != nil { return errors.Wrap(err, "expanded matching posting") } @@ -1635,6 +1648,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store s.metrics.chunkFetchDurationSum, extLsetToRemove, s.enabledLazyExpandedPostings, + s.seriesMatchRatio, s.postingGroupMaxKeySeriesRatio, s.metrics.lazyExpandedPostingsCount, s.metrics.lazyExpandedPostingGroupsByReason, @@ -1951,6 +1965,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq nil, extLsetToRemove, s.enabledLazyExpandedPostings, + s.seriesMatchRatio, s.postingGroupMaxKeySeriesRatio, s.metrics.lazyExpandedPostingsCount, s.metrics.lazyExpandedPostingGroupsByReason, @@ -2179,6 +2194,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR nil, nil, s.enabledLazyExpandedPostings, + s.seriesMatchRatio, s.postingGroupMaxKeySeriesRatio, s.metrics.lazyExpandedPostingsCount, s.metrics.lazyExpandedPostingGroupsByReason, @@ -2647,6 +2663,7 @@ func (r *bucketIndexReader) ExpandedPostings( ms sortedMatchers, bytesLimiter BytesLimiter, lazyExpandedPostingEnabled bool, + seriesMatchRatio float64, postingGroupMaxKeySeriesRatio float64, lazyExpandedPostingSizeBytes prometheus.Counter, lazyExpandedPostingGroupsByReason *prometheus.CounterVec, @@ -2703,7 +2720,7 @@ func (r *bucketIndexReader) ExpandedPostings( postingGroups = append(postingGroups, newPostingGroup(true, name, []string{value}, nil)) } - ps, err := fetchLazyExpandedPostings(ctx, postingGroups, r, bytesLimiter, addAllPostings, lazyExpandedPostingEnabled, postingGroupMaxKeySeriesRatio, lazyExpandedPostingSizeBytes, lazyExpandedPostingGroupsByReason, tenant) + ps, err := fetchLazyExpandedPostings(ctx, postingGroups, r, bytesLimiter, addAllPostings, lazyExpandedPostingEnabled, seriesMatchRatio, postingGroupMaxKeySeriesRatio, lazyExpandedPostingSizeBytes, lazyExpandedPostingGroupsByReason, tenant) if err != nil { return nil, errors.Wrap(err, "fetch and expand postings") } diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 3938d06357..05438d7ef0 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -1305,7 +1305,7 @@ func benchmarkExpandedPostings( t.ResetTimer() for i := 0; i < t.N(); i++ { - p, err := indexr.ExpandedPostings(context.Background(), newSortedMatchers(c.matchers), NewBytesLimiterFactory(0)(nil), false, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant) + p, err := indexr.ExpandedPostings(context.Background(), newSortedMatchers(c.matchers), NewBytesLimiterFactory(0)(nil), false, 0.5, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant) testutil.Ok(t, err) testutil.Equals(t, c.expectedLen, len(p.postings)) } @@ -1344,7 +1344,7 @@ func TestExpandedPostingsEmptyPostings(t *testing.T) { reg := prometheus.NewRegistry() dummyCounter := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test"}) dummyCounterVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{Name: "test_counter_vec"}, []string{"reason"}) - ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2}), NewBytesLimiterFactory(0)(nil), false, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant) + ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2}), NewBytesLimiterFactory(0)(nil), false, 0.5, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant) testutil.Ok(t, err) testutil.Equals(t, ps, (*lazyExpandedPostings)(nil)) // Make sure even if a matcher doesn't match any postings, we still cache empty expanded postings. @@ -1384,7 +1384,7 @@ func TestLazyExpandedPostingsEmptyPostings(t *testing.T) { reg := prometheus.NewRegistry() dummyCounter := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test"}) dummyCounterVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{Name: "test_counter_vec"}, []string{"reason"}) - ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2, matcher3}), NewBytesLimiterFactory(0)(nil), true, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant) + ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2, matcher3}), NewBytesLimiterFactory(0)(nil), true, 0.5, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant) testutil.Ok(t, err) // We expect emptyLazyPostings rather than lazy postings with 0 length but with matchers. testutil.Equals(t, ps, emptyLazyPostings) @@ -2931,6 +2931,7 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet dummyHistogram, nil, false, + 0.5, 0, dummyCounter, dummyCounterVec, @@ -3591,7 +3592,7 @@ func TestExpandedPostingsRace(t *testing.T) { wg.Add(1) go func(i int, bb *bucketBlock) { - refs, err := bb.indexReader(logger).ExpandedPostings(context.Background(), m, NewBytesLimiterFactory(0)(nil), false, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant) + refs, err := bb.indexReader(logger).ExpandedPostings(context.Background(), m, NewBytesLimiterFactory(0)(nil), false, 0.5, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant) testutil.Ok(t, err) defer wg.Done() diff --git a/pkg/store/lazy_postings.go b/pkg/store/lazy_postings.go index 57b48cc342..325e92e904 100644 --- a/pkg/store/lazy_postings.go +++ b/pkg/store/lazy_postings.go @@ -214,6 +214,7 @@ func fetchLazyExpandedPostings( bytesLimiter BytesLimiter, addAllPostings bool, lazyExpandedPostingEnabled bool, + seriesMatchRatio float64, postingGroupMaxKeySeriesRatio float64, lazyExpandedPostingSizeBytes prometheus.Counter, lazyExpandedPostingGroupsByReason *prometheus.CounterVec, @@ -237,7 +238,7 @@ func fetchLazyExpandedPostings( r, postingGroups, int64(r.block.estimatedMaxSeriesSize), - 0.5, // TODO(yeya24): Expose this as a flag. + seriesMatchRatio, postingGroupMaxKeySeriesRatio, lazyExpandedPostingSizeBytes, lazyExpandedPostingGroupsByReason, diff --git a/pkg/store/lazy_postings_test.go b/pkg/store/lazy_postings_test.go index cb52dac412..3b61d4be08 100644 --- a/pkg/store/lazy_postings_test.go +++ b/pkg/store/lazy_postings_test.go @@ -616,6 +616,23 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) { {name: "bar", addKeys: []string{"foo"}, cardinality: 250000, existentKeys: 1, lazy: true}, }, }, + { + name: "two posting groups with add keys, group not marked as lazy because of lower series match ratio", + inputPostings: map[string]map[string]index.Range{ + "foo": {"bar": index.Range{End: 44}}, + "bar": {"foo": index.Range{Start: 44, End: 5052}}, + }, + seriesMaxSize: 1000, + seriesMatchRatio: 0.1, + postingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}}, + {name: "bar", addKeys: []string{"foo"}}, + }, + expectedPostingGroups: []*postingGroup{ + {name: "foo", addKeys: []string{"bar"}, cardinality: 10, existentKeys: 1}, + {name: "bar", addKeys: []string{"foo"}, cardinality: 1251, existentKeys: 1, lazy: false}, + }, + }, { name: "three posting groups with add keys, two small posting group and a very large posting group, large one become lazy", inputPostings: map[string]map[string]index.Range{