Skip to content

Commit

Permalink
Test for nil on expire expanded postings (#6521)
Browse files Browse the repository at this point in the history
* Test for nil on expire expanded postings

Signed-off-by: alanprot <[email protected]>

* stopping ingester

Signed-off-by: alanprot <[email protected]>

* refactor the test to not timeout

Signed-off-by: alanprot <[email protected]>

---------

Signed-off-by: alanprot <[email protected]>
  • Loading branch information
alanprot authored Jan 22, 2025
1 parent c1a2134 commit 4477add
Showing 1 changed file with 82 additions and 0 deletions.
82 changes: 82 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,77 @@ func TestMatcherCache(t *testing.T) {
`, callPerMatcher*numberOfDifferentMatchers-numberOfDifferentMatchers, cfg.MatchersCacheMaxItems, callPerMatcher*numberOfDifferentMatchers)), "ingester_matchers_cache_requests_total", "ingester_matchers_cache_hits_total", "ingester_matchers_cache_items", "ingester_matchers_cache_max_items", "ingester_matchers_cache_evicted_total"))
}

func TestIngesterDeletionRace(t *testing.T) {
registry := prometheus.NewRegistry()
limits := defaultLimitsTestConfig()
tenantLimits := newMockTenantLimits(map[string]*validation.Limits{userID: &limits})
cfg := defaultIngesterTestConfig(t)
cfg.BlocksStorageConfig.TSDB.PostingsCache = cortex_tsdb.TSDBPostingsCacheConfig{
Head: cortex_tsdb.PostingsCacheConfig{
Enabled: true,
Ttl: time.Hour,
MaxBytes: 1024 * 1024 * 1024,
},
Blocks: cortex_tsdb.PostingsCacheConfig{
Enabled: true,
Ttl: time.Hour,
MaxBytes: 1024 * 1024 * 1024,
},
}

dir := t.TempDir()
chunksDir := filepath.Join(dir, "chunks")
blocksDir := filepath.Join(dir, "blocks")
require.NoError(t, os.Mkdir(chunksDir, os.ModePerm))
require.NoError(t, os.Mkdir(blocksDir, os.ModePerm))

ing, err := prepareIngesterWithBlocksStorageAndLimits(t, cfg, limits, tenantLimits, blocksDir, registry, false)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))
defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck
// Wait until it's ACTIVE
test.Poll(t, time.Second, ring.ACTIVE, func() interface{} {
return ing.lifecycler.GetState()
})

numberOfTenants := 50
wg := sync.WaitGroup{}
wg.Add(numberOfTenants)

for i := 0; i < numberOfTenants; i++ {
go func() {
defer wg.Done()
u := fmt.Sprintf("userId_%v", i)
ctx := user.InjectOrgID(context.Background(), u)
samples := []cortexpb.Sample{{Value: 2, TimestampMs: 10}}
_, err := ing.Push(ctx, cortexpb.ToWriteRequest([]labels.Labels{labels.FromStrings(labels.MetricName, "name")}, samples, nil, nil, cortexpb.API))
require.NoError(t, err)
ing.getTSDB(u).postingCache = &wrappedExpandedPostingsCache{ExpandedPostingsCache: ing.getTSDB(u).postingCache, purgeDelay: 10 * time.Millisecond}
ing.getTSDB(u).deletionMarkFound.Store(true) // lets force close the tenant
}()
}

wg.Wait()

ctx, c := context.WithCancel(context.Background())
defer c()

wg.Add(1)
go func() {
wg.Done()
ing.expirePostingsCache(ctx) //nolint:errcheck
}()

go func() {
wg.Wait() // make sure we clean after we started the purge go routine
ing.closeAndDeleteIdleUserTSDBs(ctx) //nolint:errcheck
}()

test.Poll(t, 5*time.Second, 0, func() interface{} {
return len(ing.getTSDBUsers())
})
}

func TestIngesterPerLabelsetLimitExceeded(t *testing.T) {
limits := defaultLimitsTestConfig()
userID := "1"
Expand Down Expand Up @@ -3528,6 +3599,17 @@ func (m *mockMetricsForLabelMatchersStreamServer) Context() context.Context {
return m.ctx
}

type wrappedExpandedPostingsCache struct {
cortex_tsdb.ExpandedPostingsCache

purgeDelay time.Duration
}

func (w *wrappedExpandedPostingsCache) PurgeExpiredItems() {
time.Sleep(w.purgeDelay)
w.ExpandedPostingsCache.PurgeExpiredItems()
}

type mockQueryStreamServer struct {
grpc.ServerStream
ctx context.Context
Expand Down

0 comments on commit 4477add

Please sign in to comment.