From 6e9db73c28dd4db04c7bf7b960ecbd9b7435127e Mon Sep 17 00:00:00 2001 From: alanprot Date: Fri, 10 Jan 2025 00:33:49 -0800 Subject: [PATCH 1/2] Share Config for Expanded Postings Cache Size Across All Tenants Signed-off-by: alanprot --- pkg/storage/tsdb/expanded_postings_cache.go | 25 +++++++++++-------- .../tsdb/expanded_postings_cache_test.go | 15 +++++------ 2 files changed, 23 insertions(+), 17 deletions(-) diff --git a/pkg/storage/tsdb/expanded_postings_cache.go b/pkg/storage/tsdb/expanded_postings_cache.go index 3ea8da709e..7f1496cf65 100644 --- a/pkg/storage/tsdb/expanded_postings_cache.go +++ b/pkg/storage/tsdb/expanded_postings_cache.go @@ -8,6 +8,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/oklog/ulid" @@ -100,6 +101,9 @@ func (cfg *PostingsCacheConfig) RegisterFlagsWithPrefix(prefix, block string, f type ExpandedPostingsCacheFactory struct { seedByHash *seedByHash cfg TSDBPostingsCacheConfig + + headCachedBytes atomic.Int64 + blocksCachedBytes atomic.Int64 } func NewExpandedPostingsCacheFactory(cfg TSDBPostingsCacheConfig) *ExpandedPostingsCacheFactory { @@ -118,7 +122,7 @@ func NewExpandedPostingsCacheFactory(cfg TSDBPostingsCacheConfig) *ExpandedPosti } func (f *ExpandedPostingsCacheFactory) NewExpandedPostingsCache(userId string, metrics *ExpandedPostingsCacheMetrics) ExpandedPostingsCache { - return newBlocksPostingsForMatchersCache(userId, f.cfg, metrics, f.seedByHash) + return newBlocksPostingsForMatchersCache(userId, f.cfg, metrics, f.seedByHash, &f.headCachedBytes, &f.blocksCachedBytes) } type ExpandedPostingsCache interface { @@ -138,7 +142,7 @@ type blocksPostingsForMatchersCache struct { seedByHash *seedByHash } -func newBlocksPostingsForMatchersCache(userId string, cfg TSDBPostingsCacheConfig, metrics *ExpandedPostingsCacheMetrics, seedByHash *seedByHash) ExpandedPostingsCache { +func newBlocksPostingsForMatchersCache(userId string, cfg TSDBPostingsCacheConfig, metrics *ExpandedPostingsCacheMetrics, seedByHash *seedByHash, headCachedBytes, blocksCachedBytes *atomic.Int64) ExpandedPostingsCache { if cfg.PostingsForMatchers == nil { cfg.PostingsForMatchers = tsdb.PostingsForMatchers } @@ -148,8 +152,8 @@ func newBlocksPostingsForMatchersCache(userId string, cfg TSDBPostingsCacheConfi } return &blocksPostingsForMatchersCache{ - headCache: newFifoCache[[]storage.SeriesRef](cfg.Head, "head", metrics, cfg.timeNow), - blocksCache: newFifoCache[[]storage.SeriesRef](cfg.Blocks, "block", metrics, cfg.timeNow), + headCache: newFifoCache[[]storage.SeriesRef](cfg.Head, headCachedBytes, "head", metrics, cfg.timeNow), + blocksCache: newFifoCache[[]storage.SeriesRef](cfg.Blocks, blocksCachedBytes, "block", metrics, cfg.timeNow), postingsForMatchersFunc: cfg.PostingsForMatchers, timeNow: cfg.timeNow, metrics: metrics, @@ -333,10 +337,10 @@ type fifoCache[V any] struct { // Fields from here should be locked cachedMtx sync.RWMutex cached *list.List - cachedBytes int64 + cachedBytes *atomic.Int64 } -func newFifoCache[V any](cfg PostingsCacheConfig, name string, metrics *ExpandedPostingsCacheMetrics, timeNow func() time.Time) *fifoCache[V] { +func newFifoCache[V any](cfg PostingsCacheConfig, cachedBytes *atomic.Int64, name string, metrics *ExpandedPostingsCacheMetrics, timeNow func() time.Time) *fifoCache[V] { return &fifoCache[V]{ cachedValues: new(sync.Map), cached: list.New(), @@ -344,6 +348,7 @@ func newFifoCache[V any](cfg PostingsCacheConfig, name string, metrics *Expanded timeNow: timeNow, name: name, metrics: *metrics, + cachedBytes: cachedBytes, } } @@ -417,7 +422,7 @@ func (c *fifoCache[V]) shouldEvictHead() (string, bool) { return "", false } - if c.cachedBytes > c.cfg.MaxBytes { + if c.cachedBytes.Load() > c.cfg.MaxBytes { return "full", true } @@ -437,7 +442,7 @@ func (c *fifoCache[V]) evictHead() { c.cached.Remove(front) oldestKey := front.Value.(string) if oldest, loaded := c.cachedValues.LoadAndDelete(oldestKey); loaded { - c.cachedBytes -= oldest.(*cacheEntryPromise[V]).sizeBytes + c.cachedBytes.Add(-oldest.(*cacheEntryPromise[V]).sizeBytes) } } @@ -449,7 +454,7 @@ func (c *fifoCache[V]) created(key string, sizeBytes int64) { c.cachedMtx.Lock() defer c.cachedMtx.Unlock() c.cached.PushBack(key) - c.cachedBytes += sizeBytes + c.cachedBytes.Add(sizeBytes) } func (c *fifoCache[V]) updateSize(oldSize, newSizeBytes int64) { @@ -459,7 +464,7 @@ func (c *fifoCache[V]) updateSize(oldSize, newSizeBytes int64) { c.cachedMtx.Lock() defer c.cachedMtx.Unlock() - c.cachedBytes += newSizeBytes - oldSize + c.cachedBytes.Add(newSizeBytes - oldSize) } type cacheEntryPromise[V any] struct { diff --git a/pkg/storage/tsdb/expanded_postings_cache_test.go b/pkg/storage/tsdb/expanded_postings_cache_test.go index 19272c00e7..7fd46e3b5d 100644 --- a/pkg/storage/tsdb/expanded_postings_cache_test.go +++ b/pkg/storage/tsdb/expanded_postings_cache_test.go @@ -5,6 +5,7 @@ import ( "fmt" "strings" "sync" + "sync/atomic" "testing" "time" @@ -13,7 +14,6 @@ import ( "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" - "go.uber.org/atomic" ) func TestCacheKey(t *testing.T) { @@ -52,14 +52,14 @@ func Test_ShouldFetchPromiseOnlyOnce(t *testing.T) { MaxBytes: 10 << 20, } m := NewPostingCacheMetrics(prometheus.NewPedanticRegistry()) - cache := newFifoCache[int](cfg, "test", m, time.Now) + cache := newFifoCache[int](cfg, &atomic.Int64{}, "test", m, time.Now) calls := atomic.Int64{} concurrency := 100 wg := sync.WaitGroup{} wg.Add(concurrency) fetchFunc := func() (int, int64, error) { - calls.Inc() + calls.Add(1) time.Sleep(100 * time.Millisecond) return 0, 0, nil } @@ -81,7 +81,7 @@ func TestFifoCacheDisabled(t *testing.T) { cfg.Enabled = false m := NewPostingCacheMetrics(prometheus.NewPedanticRegistry()) timeNow := time.Now - cache := newFifoCache[int](cfg, "test", m, timeNow) + cache := newFifoCache[int](cfg, &atomic.Int64{}, "test", m, timeNow) old, loaded := cache.getPromiseForKey("key1", func() (int, int64, error) { return 1, 0, nil }) @@ -124,7 +124,8 @@ func TestFifoCacheExpire(t *testing.T) { r := prometheus.NewPedanticRegistry() m := NewPostingCacheMetrics(r) timeNow := time.Now - cache := newFifoCache[int](c.cfg, "test", m, timeNow) + cachedBytes := atomic.Int64{} + cache := newFifoCache[int](c.cfg, &cachedBytes, "test", m, timeNow) for i := 0; i < numberOfKeys; i++ { key := RepeatStringIfNeeded(fmt.Sprintf("key%d", i), keySize) @@ -169,7 +170,7 @@ func TestFifoCacheExpire(t *testing.T) { for i := 0; i < numberOfKeys; i++ { key := RepeatStringIfNeeded(fmt.Sprintf("key%d", i), keySize) - originalSize := cache.cachedBytes + originalSize := cache.cachedBytes.Load() p, loaded := cache.getPromiseForKey(key, func() (int, int64, error) { return 2, 18, nil }) @@ -177,7 +178,7 @@ func TestFifoCacheExpire(t *testing.T) { // New value require.Equal(t, 2, p.v) // Total Size Updated - require.Equal(t, originalSize+10, cache.cachedBytes) + require.Equal(t, originalSize+10, cache.cachedBytes.Load()) } err := testutil.GatherAndCompare(r, bytes.NewBufferString(fmt.Sprintf(` From 16b3926f16de0ef30506a8a5cd12ad88293a11ed Mon Sep 17 00:00:00 2001 From: alanprot Date: Fri, 10 Jan 2025 01:16:43 -0800 Subject: [PATCH 2/2] test + lint Signed-off-by: alanprot --- pkg/ingester/ingester_test.go | 84 +++++++++++++++++++ pkg/storage/tsdb/expanded_postings_cache.go | 2 +- .../tsdb/expanded_postings_cache_test.go | 2 +- 3 files changed, 86 insertions(+), 2 deletions(-) diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 15fec1d1fd..495c89a7bc 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -5523,6 +5523,90 @@ func TestExpendedPostingsCacheIsolation(t *testing.T) { wg.Wait() } +func TestExpendedPostingsCacheGlobalLimit(t *testing.T) { + cfg := defaultIngesterTestConfig(t) + maxBytes := int64(1024) + users := []string{"test1", "test2"} + cfg.BlocksStorageConfig.TSDB.BlockRanges = []time.Duration{2 * time.Hour} + cfg.BlocksStorageConfig.TSDB.PostingsCache = cortex_tsdb.TSDBPostingsCacheConfig{ + Blocks: cortex_tsdb.PostingsCacheConfig{ + Ttl: time.Hour, + MaxBytes: maxBytes, + Enabled: true, + }, + Head: cortex_tsdb.PostingsCacheConfig{ + Ttl: time.Hour, + MaxBytes: maxBytes, + Enabled: true, + }, + } + + cfg.LifecyclerConfig.JoinAfter = 0 + + r := prometheus.NewRegistry() + i, err := prepareIngesterWithBlocksStorage(t, cfg, r) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + + metricNames := []string{"metric1", "metric2"} + + // Generate 4 hours of data so we have 1 block + head + totalSamples := 4 * 60 + var samples = make([]cortexpb.Sample, 0, totalSamples) + + for i := 0; i < totalSamples; i++ { + samples = append(samples, cortexpb.Sample{ + Value: float64(i), + TimestampMs: int64(i * 60 * 1000), + }) + } + + lbls := make([]labels.Labels, 0, len(samples)) + for j := 0; j < 10; j++ { + for i := 0; i < len(samples); i++ { + lbls = append(lbls, labels.FromStrings(labels.MetricName, metricNames[i%len(metricNames)], "a", fmt.Sprintf("aaa%v", j))) + } + } + + for i := len(samples); i < len(lbls); i++ { + samples = append(samples, samples[i%len(samples)]) + } + + for _, u := range users { + ctx := user.InjectOrgID(context.Background(), u) + req := cortexpb.ToWriteRequest(lbls, samples, nil, nil, cortexpb.API) + _, err = i.Push(ctx, req) + require.NoError(t, err) + + i.compactBlocks(ctx, false, nil) + + s := &mockQueryStreamServer{ctx: ctx} + + err = i.QueryStream(&client.QueryRequest{ + StartTimestampMs: 0, + EndTimestampMs: math.MaxInt64, + Matchers: []*client.LabelMatcher{ + { + Type: client.EQUAL, + Name: "__name__", + Value: strings.Repeat("a", int(maxBytes/2)), // make sure the size it bigger than the max size + }, + }, + }, s) + + require.NoError(t, err) + } + + err = testutil.GatherAndCompare(r, bytes.NewBufferString(` + # HELP cortex_ingester_expanded_postings_cache_evicts_total Total number of evictions in the cache, excluding items that got evicted due to TTL. + # TYPE cortex_ingester_expanded_postings_cache_evicts_total counter + cortex_ingester_expanded_postings_cache_evicts_total{cache="block",reason="full"} 1 + cortex_ingester_expanded_postings_cache_evicts_total{cache="head",reason="full"} 1 +`), "cortex_ingester_expanded_postings_cache_evicts_total") + require.NoError(t, err) +} + func TestExpendedPostingsCache(t *testing.T) { cfg := defaultIngesterTestConfig(t) cfg.BlocksStorageConfig.TSDB.BlockRanges = []time.Duration{2 * time.Hour} diff --git a/pkg/storage/tsdb/expanded_postings_cache.go b/pkg/storage/tsdb/expanded_postings_cache.go index 7f1496cf65..4e0771bbc1 100644 --- a/pkg/storage/tsdb/expanded_postings_cache.go +++ b/pkg/storage/tsdb/expanded_postings_cache.go @@ -8,7 +8,6 @@ import ( "strconv" "strings" "sync" - "sync/atomic" "time" "github.com/oklog/ulid" @@ -19,6 +18,7 @@ import ( "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/index" "github.com/segmentio/fasthash/fnv1a" + "go.uber.org/atomic" "github.com/cortexproject/cortex/pkg/util/extract" logutil "github.com/cortexproject/cortex/pkg/util/log" diff --git a/pkg/storage/tsdb/expanded_postings_cache_test.go b/pkg/storage/tsdb/expanded_postings_cache_test.go index 7fd46e3b5d..1f45d152a4 100644 --- a/pkg/storage/tsdb/expanded_postings_cache_test.go +++ b/pkg/storage/tsdb/expanded_postings_cache_test.go @@ -5,7 +5,6 @@ import ( "fmt" "strings" "sync" - "sync/atomic" "testing" "time" @@ -14,6 +13,7 @@ import ( "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" + "go.uber.org/atomic" ) func TestCacheKey(t *testing.T) {