Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make use of a per-call memory allocator for loading cached chunks #4074

Merged
merged 11 commits into from
Feb 1, 2023
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* [ENHANCEMENT] Docs: use long flag names in runbook commands. #4088
* [ENHANCEMENT] Query-frontend: log caller user agent in query stats logs. #4093
* [ENHANCEMENT] Store-gateway: add `data_type` label with values on `cortex_bucket_store_partitioner_extended_ranges_total`, `cortex_bucket_store_partitioner_expanded_ranges_total`, `cortex_bucket_store_partitioner_requested_ranges_total`, `cortex_bucket_store_partitioner_expanded_bytes_total`, `cortex_bucket_store_partitioner_requested_bytes_total` for `postings`, `series`, and `chunks`. #4095
* [ENHANCEMENT] Store-gateway: Reduce memory allocation rate when loading TSDB chunks from Memcached. #4074
* [BUGFIX] Ingester: remove series from ephemeral storage even if there are no persistent series. #4052
* [BUGFIX] Ingester: reuse memory when ingesting ephemeral series. #4072
* [BUGFIX] Fix JSON and YAML marshalling of `ephemeral_series_matchers` field in `/runtime_config`. #4091
Expand Down
123 changes: 118 additions & 5 deletions pkg/storage/tsdb/bucketcache/caching_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,65 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/thanos-io/objstore"
"golang.org/x/sync/errgroup"

"github.com/grafana/mimir/pkg/util/pool"
)

type contextKey int

const (
originCache = "cache"
originBucket = "bucket"

memoryPoolContextKey contextKey = 0
)

var errObjNotFound = errors.Errorf("object not found")

// WithMemoryPool returns a new context with a slab pool to be used as a cache.Allocator
// implementation by the underlying cache client. Slabs are released back to p when the
// io.ReadCloser associated with the Get or GetRange call is closed.
func WithMemoryPool(ctx context.Context, p pool.Interface, slabSize int) context.Context {
return context.WithValue(ctx, memoryPoolContextKey, pool.NewSafeSlabPool[byte](p, slabSize))
}

func getMemoryPool(ctx context.Context) *pool.SafeSlabPool[byte] {
val := ctx.Value(memoryPoolContextKey)
if val == nil {
return nil
}

slabs, ok := val.(*pool.SafeSlabPool[byte])
if !ok {
return nil
}

return slabs
}

func getCacheOptions(slabs *pool.SafeSlabPool[byte]) []cache.Option {
var opts []cache.Option

if slabs != nil {
opts = append(opts, cache.WithAllocator(&slabPoolAdapter{pool: slabs}))
}

return opts
}

type slabPoolAdapter struct {
pool *pool.SafeSlabPool[byte]
}

func (s *slabPoolAdapter) Get(sz int) *[]byte {
b := s.pool.Get(sz)
return &b
}

func (s *slabPoolAdapter) Put(_ *[]byte) {
// no-op
}

// CachingBucket implementation that provides some caching features, based on passed configuration.
type CachingBucket struct {
objstore.Bucket
Expand Down Expand Up @@ -220,11 +270,27 @@ func (cb *CachingBucket) Get(ctx context.Context, name string) (io.ReadCloser, e

contentKey := cachingKeyContent(name)
existsKey := cachingKeyExists(name)
slabs := getMemoryPool(ctx)
cacheOpts := getCacheOptions(slabs)
releaseSlabs := true

// On cache hit, the returned reader is responsible for freeing any allocated
// slabs. However, if the content key isn't a hit the client still might have
// allocated memory for the exists key, and we need to free it in that case.
if slabs != nil {
defer func() {
if releaseSlabs {
slabs.Release()
}
}()
}

hits := cfg.cache.Fetch(ctx, []string{contentKey, existsKey})
hits := cfg.cache.Fetch(ctx, []string{contentKey, existsKey}, cacheOpts...)
if hits[contentKey] != nil {
cb.operationHits.WithLabelValues(objstore.OpGet, cfgName).Inc()
return objstore.NopCloserWithSize(bytes.NewBuffer(hits[contentKey])), nil

releaseSlabs = false
return &sizedSlabGetReader{bytes.NewBuffer(hits[contentKey]), slabs}, nil
}

// If we know that file doesn't exist, we can return that. Useful for deletion marks.
Expand Down Expand Up @@ -363,9 +429,23 @@ func (cb *CachingBucket) cachedGetRange(ctx context.Context, name string, offset
offsetKeys[off] = k
}

releaseSlabs := true
slabs := getMemoryPool(ctx)
cacheOpts := getCacheOptions(slabs)

// If there's an error after fetching things from cache but before we return the subrange
// reader we're responsible for releasing any memory used by the slab pool.
if slabs != nil {
defer func() {
if releaseSlabs {
slabs.Release()
}
}()
}

// Try to get all subranges from the cache.
totalCachedBytes := int64(0)
hits := cfg.cache.Fetch(ctx, keys)
hits := cfg.cache.Fetch(ctx, keys, cacheOpts...)
for _, b := range hits {
totalCachedBytes += int64(len(b))
}
Expand All @@ -383,7 +463,8 @@ func (cb *CachingBucket) cachedGetRange(ctx context.Context, name string, offset
}
}

return io.NopCloser(newSubrangesReader(cfg.subrangeSize, offsetKeys, hits, offset, length)), nil
releaseSlabs = false
return newSubrangesReader(cfg.subrangeSize, offsetKeys, hits, offset, length, slabs), nil
}

type rng struct {
Expand Down Expand Up @@ -515,19 +596,32 @@ type subrangesReader struct {

// Remaining data to return from this reader. Once zero, this reader reports EOF.
remaining int64

// Pool of bytes used for cache results
slabs *pool.SafeSlabPool[byte]
}

func newSubrangesReader(subrangeSize int64, offsetsKeys map[int64]string, subranges map[string][]byte, readOffset, remaining int64) *subrangesReader {
func newSubrangesReader(subrangeSize int64, offsetsKeys map[int64]string, subranges map[string][]byte, readOffset, remaining int64, slabs *pool.SafeSlabPool[byte]) *subrangesReader {
return &subrangesReader{
subrangeSize: subrangeSize,
offsetsKeys: offsetsKeys,
subranges: subranges,

readOffset: readOffset,
remaining: remaining,

slabs: slabs,
}
}

func (c *subrangesReader) Close() error {
if c.slabs != nil {
c.slabs.Release()
}

return nil
}

func (c *subrangesReader) Read(p []byte) (n int, err error) {
if c.remaining <= 0 {
return 0, io.EOF
Expand Down Expand Up @@ -608,6 +702,25 @@ func (g *getReader) Read(p []byte) (n int, err error) {
return n, err
}

// sizedSlabGetReader wraps an existing io.Reader with a cleanup method to release
// any allocated slabs back to a pool.SafeSlabPool when closed.
type sizedSlabGetReader struct {
io.Reader
slabs *pool.SafeSlabPool[byte]
}

func (s *sizedSlabGetReader) Close() error {
if s.slabs != nil {
s.slabs.Release()
}

return nil
}

func (s *sizedSlabGetReader) ObjectSize() (int64, error) {
return objstore.TryToGetSize(s.Reader)
}

// JSONIterCodec encodes iter results into JSON. Suitable for root dir.
type JSONIterCodec struct{}

Expand Down
4 changes: 3 additions & 1 deletion pkg/storegateway/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/storage/sharding"
"github.com/grafana/mimir/pkg/storage/tsdb/block"
"github.com/grafana/mimir/pkg/storage/tsdb/bucketcache"
"github.com/grafana/mimir/pkg/storage/tsdb/metadata"
"github.com/grafana/mimir/pkg/storegateway/hintspb"
"github.com/grafana/mimir/pkg/storegateway/indexcache"
Expand Down Expand Up @@ -1722,7 +1723,7 @@ func (b *bucketBlock) readChunkRange(ctx context.Context, seq int, off, length i
return nil, errors.Errorf("unknown segment file for index %d", seq)
}

// Get a reader for the required range.
ctx = bucketcache.WithMemoryPool(ctx, chunkBytesSlicePool, chunkBytesSlabSize)
reader, err := b.bkt.GetRange(ctx, b.chunkObjs[seq], off, length)
if err != nil {
return nil, errors.Wrap(err, "get range reader")
Expand All @@ -1748,6 +1749,7 @@ func (b *bucketBlock) chunkRangeReader(ctx context.Context, seq int, off, length
return nil, errors.Errorf("unknown segment file for index %d", seq)
}

ctx = bucketcache.WithMemoryPool(ctx, chunkBytesSlicePool, chunkBytesSlabSize)
return b.bkt.GetRange(ctx, b.chunkObjs[seq], off, length)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/storegateway/series_chunks.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ const (
// number of chunks (across series).
seriesChunksSlabSize = 1000

// Selected so that an individual chunk's data typically fits within the slab size (16 KiB)
chunkBytesSlabSize = 16_384
// Selected so that chunks typically fit within the slab size (16 KiB)
chunkBytesSlabSize = 16 * 1024
)

var (
Expand Down
3 changes: 2 additions & 1 deletion pkg/util/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,9 @@ func (b *SlabPool[T]) Get(size int) []T {
b.slabs = append(b.slabs, slab)
}

// Resize the slab length and return a sub-slice.
// Resize the slab length to include the requested size
*slab = (*slab)[:len(*slab)+size]
// Create a subslice of the slab with length and capacity of size
return (*slab)[len(*slab)-size : len(*slab) : len(*slab)]
}

Expand Down