diff --git a/CHANGELOG.md b/CHANGELOG.md index d2948ef8b79..b06b2073920 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ * [ENHANCEMENT] Docs: use long flag names in runbook commands. #4088 * [ENHANCEMENT] Query-frontend: log caller user agent in query stats logs. #4093 * [ENHANCEMENT] Store-gateway: add `data_type` label with values on `cortex_bucket_store_partitioner_extended_ranges_total`, `cortex_bucket_store_partitioner_expanded_ranges_total`, `cortex_bucket_store_partitioner_requested_ranges_total`, `cortex_bucket_store_partitioner_expanded_bytes_total`, `cortex_bucket_store_partitioner_requested_bytes_total` for `postings`, `series`, and `chunks`. #4095 +* [ENHANCEMENT] Store-gateway: Reduce memory allocation rate when loading TSDB chunks from Memcached. #4074 * [BUGFIX] Ingester: remove series from ephemeral storage even if there are no persistent series. #4052 * [BUGFIX] Ingester: reuse memory when ingesting ephemeral series. #4072 * [BUGFIX] Fix JSON and YAML marshalling of `ephemeral_series_matchers` field in `/runtime_config`. #4091 diff --git a/pkg/storage/tsdb/bucketcache/caching_bucket.go b/pkg/storage/tsdb/bucketcache/caching_bucket.go index 47d459efc92..ad07e62ee06 100644 --- a/pkg/storage/tsdb/bucketcache/caching_bucket.go +++ b/pkg/storage/tsdb/bucketcache/caching_bucket.go @@ -24,15 +24,65 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/thanos-io/objstore" "golang.org/x/sync/errgroup" + + "github.com/grafana/mimir/pkg/util/pool" ) +type contextKey int + const ( originCache = "cache" originBucket = "bucket" + + memoryPoolContextKey contextKey = 0 ) var errObjNotFound = errors.Errorf("object not found") +// WithMemoryPool returns a new context with a slab pool to be used as a cache.Allocator +// implementation by the underlying cache client. Slabs are released back to p when the +// io.ReadCloser associated with the Get or GetRange call is closed. +func WithMemoryPool(ctx context.Context, p pool.Interface, slabSize int) context.Context { + return context.WithValue(ctx, memoryPoolContextKey, pool.NewSafeSlabPool[byte](p, slabSize)) +} + +func getMemoryPool(ctx context.Context) *pool.SafeSlabPool[byte] { + val := ctx.Value(memoryPoolContextKey) + if val == nil { + return nil + } + + slabs, ok := val.(*pool.SafeSlabPool[byte]) + if !ok { + return nil + } + + return slabs +} + +func getCacheOptions(slabs *pool.SafeSlabPool[byte]) []cache.Option { + var opts []cache.Option + + if slabs != nil { + opts = append(opts, cache.WithAllocator(&slabPoolAdapter{pool: slabs})) + } + + return opts +} + +type slabPoolAdapter struct { + pool *pool.SafeSlabPool[byte] +} + +func (s *slabPoolAdapter) Get(sz int) *[]byte { + b := s.pool.Get(sz) + return &b +} + +func (s *slabPoolAdapter) Put(_ *[]byte) { + // no-op +} + // CachingBucket implementation that provides some caching features, based on passed configuration. type CachingBucket struct { objstore.Bucket @@ -220,11 +270,27 @@ func (cb *CachingBucket) Get(ctx context.Context, name string) (io.ReadCloser, e contentKey := cachingKeyContent(name) existsKey := cachingKeyExists(name) + slabs := getMemoryPool(ctx) + cacheOpts := getCacheOptions(slabs) + releaseSlabs := true + + // On cache hit, the returned reader is responsible for freeing any allocated + // slabs. However, if the content key isn't a hit the client still might have + // allocated memory for the exists key, and we need to free it in that case. + if slabs != nil { + defer func() { + if releaseSlabs { + slabs.Release() + } + }() + } - hits := cfg.cache.Fetch(ctx, []string{contentKey, existsKey}) + hits := cfg.cache.Fetch(ctx, []string{contentKey, existsKey}, cacheOpts...) if hits[contentKey] != nil { cb.operationHits.WithLabelValues(objstore.OpGet, cfgName).Inc() - return objstore.NopCloserWithSize(bytes.NewBuffer(hits[contentKey])), nil + + releaseSlabs = false + return &sizedSlabGetReader{bytes.NewBuffer(hits[contentKey]), slabs}, nil } // If we know that file doesn't exist, we can return that. Useful for deletion marks. @@ -363,9 +429,23 @@ func (cb *CachingBucket) cachedGetRange(ctx context.Context, name string, offset offsetKeys[off] = k } + releaseSlabs := true + slabs := getMemoryPool(ctx) + cacheOpts := getCacheOptions(slabs) + + // If there's an error after fetching things from cache but before we return the subrange + // reader we're responsible for releasing any memory used by the slab pool. + if slabs != nil { + defer func() { + if releaseSlabs { + slabs.Release() + } + }() + } + // Try to get all subranges from the cache. totalCachedBytes := int64(0) - hits := cfg.cache.Fetch(ctx, keys) + hits := cfg.cache.Fetch(ctx, keys, cacheOpts...) for _, b := range hits { totalCachedBytes += int64(len(b)) } @@ -383,7 +463,8 @@ func (cb *CachingBucket) cachedGetRange(ctx context.Context, name string, offset } } - return io.NopCloser(newSubrangesReader(cfg.subrangeSize, offsetKeys, hits, offset, length)), nil + releaseSlabs = false + return newSubrangesReader(cfg.subrangeSize, offsetKeys, hits, offset, length, slabs), nil } type rng struct { @@ -515,9 +596,12 @@ type subrangesReader struct { // Remaining data to return from this reader. Once zero, this reader reports EOF. remaining int64 + + // Pool of bytes used for cache results + slabs *pool.SafeSlabPool[byte] } -func newSubrangesReader(subrangeSize int64, offsetsKeys map[int64]string, subranges map[string][]byte, readOffset, remaining int64) *subrangesReader { +func newSubrangesReader(subrangeSize int64, offsetsKeys map[int64]string, subranges map[string][]byte, readOffset, remaining int64, slabs *pool.SafeSlabPool[byte]) *subrangesReader { return &subrangesReader{ subrangeSize: subrangeSize, offsetsKeys: offsetsKeys, @@ -525,9 +609,19 @@ func newSubrangesReader(subrangeSize int64, offsetsKeys map[int64]string, subran readOffset: readOffset, remaining: remaining, + + slabs: slabs, } } +func (c *subrangesReader) Close() error { + if c.slabs != nil { + c.slabs.Release() + } + + return nil +} + func (c *subrangesReader) Read(p []byte) (n int, err error) { if c.remaining <= 0 { return 0, io.EOF @@ -608,6 +702,25 @@ func (g *getReader) Read(p []byte) (n int, err error) { return n, err } +// sizedSlabGetReader wraps an existing io.Reader with a cleanup method to release +// any allocated slabs back to a pool.SafeSlabPool when closed. +type sizedSlabGetReader struct { + io.Reader + slabs *pool.SafeSlabPool[byte] +} + +func (s *sizedSlabGetReader) Close() error { + if s.slabs != nil { + s.slabs.Release() + } + + return nil +} + +func (s *sizedSlabGetReader) ObjectSize() (int64, error) { + return objstore.TryToGetSize(s.Reader) +} + // JSONIterCodec encodes iter results into JSON. Suitable for root dir. type JSONIterCodec struct{} diff --git a/pkg/storegateway/bucket.go b/pkg/storegateway/bucket.go index dcbf30ad696..e7904d35240 100644 --- a/pkg/storegateway/bucket.go +++ b/pkg/storegateway/bucket.go @@ -43,6 +43,7 @@ import ( "github.com/grafana/mimir/pkg/mimirpb" "github.com/grafana/mimir/pkg/storage/sharding" "github.com/grafana/mimir/pkg/storage/tsdb/block" + "github.com/grafana/mimir/pkg/storage/tsdb/bucketcache" "github.com/grafana/mimir/pkg/storage/tsdb/metadata" "github.com/grafana/mimir/pkg/storegateway/hintspb" "github.com/grafana/mimir/pkg/storegateway/indexcache" @@ -1722,7 +1723,7 @@ func (b *bucketBlock) readChunkRange(ctx context.Context, seq int, off, length i return nil, errors.Errorf("unknown segment file for index %d", seq) } - // Get a reader for the required range. + ctx = bucketcache.WithMemoryPool(ctx, chunkBytesSlicePool, chunkBytesSlabSize) reader, err := b.bkt.GetRange(ctx, b.chunkObjs[seq], off, length) if err != nil { return nil, errors.Wrap(err, "get range reader") @@ -1748,6 +1749,7 @@ func (b *bucketBlock) chunkRangeReader(ctx context.Context, seq int, off, length return nil, errors.Errorf("unknown segment file for index %d", seq) } + ctx = bucketcache.WithMemoryPool(ctx, chunkBytesSlicePool, chunkBytesSlabSize) return b.bkt.GetRange(ctx, b.chunkObjs[seq], off, length) } diff --git a/pkg/storegateway/series_chunks.go b/pkg/storegateway/series_chunks.go index 4ff1c5af5da..8c0035342e8 100644 --- a/pkg/storegateway/series_chunks.go +++ b/pkg/storegateway/series_chunks.go @@ -23,8 +23,8 @@ const ( // number of chunks (across series). seriesChunksSlabSize = 1000 - // Selected so that an individual chunk's data typically fits within the slab size (16 KiB) - chunkBytesSlabSize = 16_384 + // Selected so that chunks typically fit within the slab size (16 KiB) + chunkBytesSlabSize = 16 * 1024 ) var ( diff --git a/pkg/util/pool/pool.go b/pkg/util/pool/pool.go index b0af639d8e6..a972e96a061 100644 --- a/pkg/util/pool/pool.go +++ b/pkg/util/pool/pool.go @@ -207,8 +207,9 @@ func (b *SlabPool[T]) Get(size int) []T { b.slabs = append(b.slabs, slab) } - // Resize the slab length and return a sub-slice. + // Resize the slab length to include the requested size *slab = (*slab)[:len(*slab)+size] + // Create a subslice of the slab with length and capacity of size return (*slab)[len(*slab)-size : len(*slab) : len(*slab)] }