Skip to content

Commit

Permalink
create more tests on the expanded postings cache
Browse files Browse the repository at this point in the history
Signed-off-by: alanprot <[email protected]>
  • Loading branch information
alanprot committed Jan 28, 2025
1 parent 76adacd commit 5bc90ec
Show file tree
Hide file tree
Showing 2 changed files with 183 additions and 0 deletions.
175 changes: 175 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5605,6 +5605,181 @@ func TestExpendedPostingsCacheIsolation(t *testing.T) {
wg.Wait()
}

func TestExpendedPostingsCacheMatchers(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
cfg.BlocksStorageConfig.TSDB.ExpandedCachingExpireInterval = time.Second
cfg.BlocksStorageConfig.TSDB.BlockRanges = []time.Duration{2 * time.Hour}
cfg.BlocksStorageConfig.TSDB.PostingsCache.Blocks.Enabled = true
cfg.BlocksStorageConfig.TSDB.PostingsCache.Head.Enabled = true

ctx := user.InjectOrgID(context.Background(), userID)

r := prometheus.NewRegistry()
ing, err := prepareIngesterWithBlocksStorage(t, cfg, r)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))
defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck

// Wait until the ingester is ACTIVE
test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() interface{} {
return ing.lifecycler.GetState()
})

numberOfMetricNames := 10
seriesPerMetricsNames := 25
timeStamp := int64(60 * 1000)
seriesCreated := map[string]labels.Labels{}

for i := 0; i < numberOfMetricNames; i++ {
metricName := fmt.Sprintf("metric_%v", i)
for j := 0; j < seriesPerMetricsNames; j++ {
s := labels.FromStrings(labels.MetricName, metricName, "labelA", fmt.Sprintf("series_%v", j))
_, err = ing.Push(ctx, cortexpb.ToWriteRequest([]labels.Labels{s}, []cortexpb.Sample{{Value: 2, TimestampMs: timeStamp}}, nil, nil, cortexpb.API))
seriesCreated[s.String()] = s
require.NoError(t, err)
}
}

db := ing.getTSDB(userID)

type testCase struct {
matchers []*client.LabelMatcher
}

cases := []testCase{}

nameMatcher := &client.LabelMatcher{
Type: client.EQUAL,
Name: labels.MetricName,
Value: "metric_0",
}

for i := 0; i < 4; i++ {
tc := testCase{
matchers: []*client.LabelMatcher{nameMatcher},
}

switch client.MatchType(i) {
case client.EQUAL | client.NOT_EQUAL:
tc.matchers = append(tc.matchers, &client.LabelMatcher{
Type: client.MatchType(i),
Name: "labelA",
Value: "series_0",
})
default:
tc.matchers = append(tc.matchers, &client.LabelMatcher{
Type: client.MatchType(i),
Name: "labelA",
Value: "series_.*",
})
}
cases = append(cases, tc)
}

cases = append(cases,
testCase{
matchers: []*client.LabelMatcher{
nameMatcher,
{
Type: client.REGEX_MATCH,
Name: "labelA",
Value: "",
},
},
},
testCase{
matchers: []*client.LabelMatcher{
nameMatcher,
{
Type: client.REGEX_NO_MATCH,
Name: "labelA",
Value: "",
},
},
},
)

ranges := []struct {
startTs, endTs int64
hasSamples bool
}{
// Totally in the past
{
startTs: 0,
endTs: timeStamp / 2,
hasSamples: false,
},
{
startTs: timeStamp / 2,
endTs: timeStamp,
hasSamples: true,
},
{
startTs: timeStamp / 2,
endTs: timeStamp * 2,
hasSamples: true,
},
{
startTs: timeStamp + 1,
endTs: timeStamp * 2,
hasSamples: false,
},
}

verify := func(t *testing.T, tc testCase, startTs, endTs int64, hasSamples bool) {
s := &mockQueryStreamServer{ctx: ctx}
err := ing.QueryStream(&client.QueryRequest{
StartTimestampMs: startTs,
EndTimestampMs: endTs,
Matchers: tc.matchers,
}, s)
require.NoError(t, err)
if hasSamples {
expectedCount := len(seriesCreated)
matchers, err := client.FromLabelMatchers(ing.matchersCache, tc.matchers)
require.NoError(t, err)
for _, s := range seriesCreated {
for _, m := range matchers {
if !m.Matches(s.Get(m.Name)) {
expectedCount--
break
}
}
}

require.Equal(t, expectedCount, len(s.series))
} else {
require.Equal(t, 0, len(s.series))
}
}

for _, tc := range cases {
testName := ""
for _, matcher := range tc.matchers {
t, _ := matcher.MatcherType()
testName += matcher.Name + t.String() + matcher.Value + "|"

}
t.Run(fmt.Sprintf("%v", testName), func(t *testing.T) {
for _, r := range ranges {
t.Run(fmt.Sprintf("start=%v,end=%v", r.startTs, r.endTs), func(t *testing.T) {
db.postingCache.Clear()

// lets run 2 times to hit the cache
for i := 0; i < 2; i++ {
verify(t, tc, r.startTs, r.endTs, r.hasSamples)
}

// run the test again with all other ranges
for _, r1 := range ranges {
verify(t, tc, r1.startTs, r1.endTs, r1.hasSamples)
}
})
}
})
}
}

func TestExpendedPostingsCache(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
cfg.BlocksStorageConfig.TSDB.ExpandedCachingExpireInterval = time.Second
Expand Down
8 changes: 8 additions & 0 deletions pkg/storage/tsdb/expanded_postings_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ type ExpandedPostingsCache interface {
PostingsForMatchers(ctx context.Context, blockID ulid.ULID, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error)
ExpireSeries(metric labels.Labels)
PurgeExpiredItems()
Clear()
Size() int
}

Expand All @@ -138,6 +139,12 @@ type blocksPostingsForMatchersCache struct {

metrics *ExpandedPostingsCacheMetrics
seedByHash *seedByHash
cfg TSDBPostingsCacheConfig
}

func (c *blocksPostingsForMatchersCache) Clear() {
c.headCache = newFifoCache[[]storage.SeriesRef](c.cfg.Head, "head", c.metrics, c.cfg.timeNow)
c.blocksCache = newFifoCache[[]storage.SeriesRef](c.cfg.Blocks, "block", c.metrics, c.cfg.timeNow)
}

func newBlocksPostingsForMatchersCache(userId string, cfg TSDBPostingsCacheConfig, metrics *ExpandedPostingsCacheMetrics, seedByHash *seedByHash) ExpandedPostingsCache {
Expand All @@ -157,6 +164,7 @@ func newBlocksPostingsForMatchersCache(userId string, cfg TSDBPostingsCacheConfi
metrics: metrics,
seedByHash: seedByHash,
userId: userId,
cfg: cfg,
}
}

Expand Down

0 comments on commit 5bc90ec

Please sign in to comment.