Skip to content

Commit

Permalink
Fix: expanded postings can cache wrong data when queries are issued "…
Browse files Browse the repository at this point in the history
…in the future" (#6562)

* improve fuzz test for expanded postings cache

Signed-off-by: alanprot <[email protected]>

* create more tests on the expanded postings cache

Signed-off-by: alanprot <[email protected]>

* adding get series call on the test

Signed-off-by: alanprot <[email protected]>

* no use CachedBlockChunkQuerier when query time range is completely after the last sample added in the head

Signed-off-by: alanprot <[email protected]>

* adding comments

Signed-off-by: alanprot <[email protected]>

* increase the number of fuzz test from 100 to 300

Signed-off-by: alanprot <[email protected]>

* add get series fuzzy testing

Signed-off-by: alanprot <[email protected]>

---------

Signed-off-by: alanprot <[email protected]>
  • Loading branch information
alanprot authored Jan 29, 2025
1 parent 653ea65 commit 4b32f29
Show file tree
Hide file tree
Showing 4 changed files with 283 additions and 21 deletions.
67 changes: 52 additions & 15 deletions integration/query_fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ func TestExpandedPostingsCacheFuzz(t *testing.T) {
scrapeInterval,
i*numSamples,
numSamples,
prompb.Label{Name: "j", Value: fmt.Sprintf("%d", j)},
prompb.Label{Name: "test_label", Value: fmt.Sprintf("test_label_value_%d", j)},
)
ss[i*numberOfLabelsPerSeries+j] = series

Expand All @@ -453,11 +453,18 @@ func TestExpandedPostingsCacheFuzz(t *testing.T) {
ps := promqlsmith.New(rnd, lbls, opts...)

// Create the queries with the original labels
testRun := 100
testRun := 300
queries := make([]string, testRun)
matchers := make([]string, testRun)
for i := 0; i < testRun; i++ {
expr := ps.WalkRangeQuery()
queries[i] = expr.Pretty(0)
matchers[i] = storepb.PromMatchersToString(
append(
ps.WalkSelectors(),
labels.MustNewMatcher(labels.MatchEqual, "__name__", fmt.Sprintf("test_series_%d", i%numSeries)),
)...,
)
}

// Lets run multiples iterations and create new series every iteration
Expand All @@ -472,7 +479,7 @@ func TestExpandedPostingsCacheFuzz(t *testing.T) {
scrapeInterval,
i*numSamples,
numSamples,
prompb.Label{Name: "j", Value: fmt.Sprintf("%d", j)},
prompb.Label{Name: "test_label", Value: fmt.Sprintf("test_label_value_%d", j)},
prompb.Label{Name: "k", Value: fmt.Sprintf("%d", k)},
)
}
Expand All @@ -485,42 +492,72 @@ func TestExpandedPostingsCacheFuzz(t *testing.T) {
}

type testCase struct {
query string
res1, res2 model.Value
err1, err2 error
query string
qt string
res1, res2 model.Value
sres1, sres2 []model.LabelSet
err1, err2 error
}

queryStart := time.Now().Add(-time.Hour * 24)
queryEnd := time.Now()
cases := make([]*testCase, 0, 200)
cases := make([]*testCase, 0, len(queries)*3)

for _, query := range queries {
res1, err1 := c1.QueryRange(query, queryStart, queryEnd, scrapeInterval)
res2, err2 := c2.QueryRange(query, queryStart, queryEnd, scrapeInterval)
fuzzyTime := time.Duration(rand.Int63n(time.Now().UnixMilli() - start.UnixMilli()))
queryEnd := start.Add(fuzzyTime * time.Millisecond)
res1, err1 := c1.Query(query, queryEnd)
res2, err2 := c2.Query(query, queryEnd)
cases = append(cases, &testCase{
query: query,
qt: "instant",
res1: res1,
res2: res2,
err1: err1,
err2: err2,
})
res1, err1 = c1.QueryRange(query, start, queryEnd, scrapeInterval)
res2, err2 = c2.QueryRange(query, start, queryEnd, scrapeInterval)
cases = append(cases, &testCase{
query: query,
qt: "range query",
res1: res1,
res2: res2,
err1: err1,
err2: err2,
})
}

for _, m := range matchers {
fuzzyTime := time.Duration(rand.Int63n(time.Now().UnixMilli() - start.UnixMilli()))
queryEnd := start.Add(fuzzyTime * time.Millisecond)
res1, err := c1.Series([]string{m}, start, queryEnd)
require.NoError(t, err)
res2, err := c2.Series([]string{m}, start, queryEnd)
require.NoError(t, err)
cases = append(cases, &testCase{
query: m,
qt: "get series",
sres1: res1,
sres2: res2,
})
}

failures := 0
for i, tc := range cases {
qt := "range query"
if tc.err1 != nil || tc.err2 != nil {
if !cmp.Equal(tc.err1, tc.err2) {
t.Logf("case %d error mismatch.\n%s: %s\nerr1: %v\nerr2: %v\n", i, qt, tc.query, tc.err1, tc.err2)
t.Logf("case %d error mismatch.\n%s: %s\nerr1: %v\nerr2: %v\n", i, tc.qt, tc.query, tc.err1, tc.err2)
failures++
}
} else if shouldUseSampleNumComparer(tc.query) {
if !cmp.Equal(tc.res1, tc.res2, sampleNumComparer) {
t.Logf("case %d # of samples mismatch.\n%s: %s\nres1: %s\nres2: %s\n", i, qt, tc.query, tc.res1.String(), tc.res2.String())
t.Logf("case %d # of samples mismatch.\n%s: %s\nres1: %s\nres2: %s\n", i, tc.qt, tc.query, tc.res1.String(), tc.res2.String())
failures++
}
} else if !cmp.Equal(tc.res1, tc.res2, comparer) {
t.Logf("case %d results mismatch.\n%s: %s\nres1: %s\nres2: %s\n", i, qt, tc.query, tc.res1.String(), tc.res2.String())
t.Logf("case %d results mismatch.\n%s: %s\nres1: %s\nres2: %s\n", i, tc.qt, tc.query, tc.res1.String(), tc.res2.String())
failures++
} else if !cmp.Equal(tc.sres1, tc.sres1, labelSetsComparer) {
t.Logf("case %d results mismatch.\n%s: %s\nsres1: %s\nsres2: %s\n", i, tc.qt, tc.query, tc.sres1, tc.sres2)
failures++
}
}
Expand Down
28 changes: 22 additions & 6 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -2283,6 +2283,27 @@ func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*userTSDB, error)
return db, nil
}

func (i *Ingester) blockChunkQuerierFunc(userId string) tsdb.BlockChunkQuerierFunc {
return func(b tsdb.BlockReader, mint, maxt int64) (storage.ChunkQuerier, error) {
db := i.getTSDB(userId)

var postingCache cortex_tsdb.ExpandedPostingsCache
if db != nil {
postingCache = db.postingCache
}

// Caching expanded postings for queries that are "in the future" may lead to incorrect results being cached.
// This occurs because the tsdb.PostingsForMatchers function can return invalid data in such scenarios.
// For more details, see: https://github.com/cortexproject/cortex/issues/6556
// TODO: alanprot: Consider removing this logic when prometheus is updated as this logic is "fixed" upstream.
if postingCache == nil || mint > db.Head().MaxTime() {
return tsdb.NewBlockChunkQuerier(b, mint, maxt)
}

return cortex_tsdb.NewCachedBlockChunkQuerier(postingCache, b, mint, maxt)
}
}

// createTSDB creates a TSDB for a given userID, and returns the created db.
func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
tsdbPromReg := prometheus.NewRegistry()
Expand Down Expand Up @@ -2346,12 +2367,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
OutOfOrderCapMax: i.cfg.BlocksStorageConfig.TSDB.OutOfOrderCapMax,
EnableOverlappingCompaction: false, // Always let compactors handle overlapped blocks, e.g. OOO blocks.
EnableNativeHistograms: i.cfg.BlocksStorageConfig.TSDB.EnableNativeHistograms,
BlockChunkQuerierFunc: func(b tsdb.BlockReader, mint, maxt int64) (storage.ChunkQuerier, error) {
if postingCache != nil {
return cortex_tsdb.NewCachedBlockChunkQuerier(postingCache, b, mint, maxt)
}
return tsdb.NewBlockChunkQuerier(b, mint, maxt)
},
BlockChunkQuerierFunc: i.blockChunkQuerierFunc(userID),
}, nil)
if err != nil {
return nil, errors.Wrapf(err, "failed to open TSDB: %s", udir)
Expand Down
195 changes: 195 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5605,6 +5605,201 @@ func TestExpendedPostingsCacheIsolation(t *testing.T) {
wg.Wait()
}

func TestExpendedPostingsCacheMatchers(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
cfg.BlocksStorageConfig.TSDB.ExpandedCachingExpireInterval = time.Second
cfg.BlocksStorageConfig.TSDB.BlockRanges = []time.Duration{2 * time.Hour}
cfg.BlocksStorageConfig.TSDB.PostingsCache.Blocks.Enabled = true
cfg.BlocksStorageConfig.TSDB.PostingsCache.Head.Enabled = true
cfg.QueryIngestersWithin = 24 * time.Hour

ctx := user.InjectOrgID(context.Background(), userID)

r := prometheus.NewRegistry()
ing, err := prepareIngesterWithBlocksStorage(t, cfg, r)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))
defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck

// Wait until the ingester is ACTIVE
test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() interface{} {
return ing.lifecycler.GetState()
})

numberOfMetricNames := 10
seriesPerMetricsNames := 25
timeStamp := int64(60 * 1000)
seriesCreated := map[string]labels.Labels{}

for i := 0; i < numberOfMetricNames; i++ {
metricName := fmt.Sprintf("metric_%v", i)
for j := 0; j < seriesPerMetricsNames; j++ {
s := labels.FromStrings(labels.MetricName, metricName, "labelA", fmt.Sprintf("series_%v", j))
_, err = ing.Push(ctx, cortexpb.ToWriteRequest([]labels.Labels{s}, []cortexpb.Sample{{Value: 2, TimestampMs: timeStamp}}, nil, nil, cortexpb.API))
seriesCreated[s.String()] = s
require.NoError(t, err)
}
}

db := ing.getTSDB(userID)

type testCase struct {
matchers []*client.LabelMatcher
}

cases := []testCase{}

nameMatcher := &client.LabelMatcher{
Type: client.EQUAL,
Name: labels.MetricName,
Value: "metric_0",
}

for i := 0; i < 4; i++ {
tc := testCase{
matchers: []*client.LabelMatcher{nameMatcher},
}

switch client.MatchType(i) {
case client.EQUAL | client.NOT_EQUAL:
tc.matchers = append(tc.matchers, &client.LabelMatcher{
Type: client.MatchType(i),
Name: "labelA",
Value: "series_0",
})
default:
tc.matchers = append(tc.matchers, &client.LabelMatcher{
Type: client.MatchType(i),
Name: "labelA",
Value: "series_.*",
})
}
cases = append(cases, tc)
}

for _, v := range []string{".*", "", ".+"} {
cases = append(cases,
testCase{
matchers: []*client.LabelMatcher{
nameMatcher,
{
Type: client.REGEX_MATCH,
Name: "labelA",
Value: v,
},
},
},
testCase{
matchers: []*client.LabelMatcher{
nameMatcher,
{
Type: client.REGEX_NO_MATCH,
Name: "labelA",
Value: v,
},
},
},
)
}

ranges := []struct {
startTs, endTs int64
hasSamples bool
}{
// Totally in the past
{
startTs: 0,
endTs: timeStamp / 2,
hasSamples: false,
},
{
startTs: timeStamp / 2,
endTs: timeStamp,
hasSamples: true,
},
{
startTs: timeStamp / 2,
endTs: timeStamp * 2,
hasSamples: true,
},
{
startTs: timeStamp + 1,
endTs: timeStamp * 2,
hasSamples: false,
},
}

verify := func(t *testing.T, tc testCase, startTs, endTs int64, hasSamples bool) {

expectedCount := len(seriesCreated)
matchers, err := client.FromLabelMatchers(ing.matchersCache, tc.matchers)
require.NoError(t, err)
for _, s := range seriesCreated {
for _, m := range matchers {
if !m.Matches(s.Get(m.Name)) {
expectedCount--
break
}
}
}

seriesResponse, err := ing.MetricsForLabelMatchers(ctx, &client.MetricsForLabelMatchersRequest{
StartTimestampMs: startTs,
EndTimestampMs: endTs,
MatchersSet: []*client.LabelMatchers{
{
Matchers: tc.matchers,
},
},
})
require.NoError(t, err)
if hasSamples {
require.Len(t, seriesResponse.Metric, expectedCount)
} else {
require.Len(t, seriesResponse.Metric, 0)
}

s := &mockQueryStreamServer{ctx: ctx}
err = ing.QueryStream(&client.QueryRequest{
StartTimestampMs: startTs,
EndTimestampMs: endTs,
Matchers: tc.matchers,
}, s)
require.NoError(t, err)
if hasSamples {
require.Equal(t, expectedCount, len(s.series))
} else {
require.Equal(t, 0, len(s.series))
}
}

for _, tc := range cases {
testName := ""
for _, matcher := range tc.matchers {
t, _ := matcher.MatcherType()
testName += matcher.Name + t.String() + matcher.Value + "|"

}
t.Run(fmt.Sprintf("%v", testName), func(t *testing.T) {
for _, r := range ranges {
t.Run(fmt.Sprintf("start=%v,end=%v", r.startTs, r.endTs), func(t *testing.T) {
db.postingCache.Clear()

// lets run 2 times to hit the cache
for i := 0; i < 2; i++ {
verify(t, tc, r.startTs, r.endTs, r.hasSamples)
}

// run the test again with all other ranges
for _, r1 := range ranges {
verify(t, tc, r1.startTs, r1.endTs, r1.hasSamples)
}
})
}
})
}
}

func TestExpendedPostingsCache(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
cfg.BlocksStorageConfig.TSDB.ExpandedCachingExpireInterval = time.Second
Expand Down
Loading

0 comments on commit 4b32f29

Please sign in to comment.