From e6e5dd3d316f5f2001978bd0111ce4be2f616cdb Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= <pstibrany@gmail.com>
Date: Wed, 8 Jun 2022 11:33:57 +0200
Subject: [PATCH] Update Prometheus with async chunk mapper changes. (#2043)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

* Update Prometheus with async chunk mapper changes.

Included changes:

https://github.com/grafana/mimir-prometheus/pull/131
https://github.com/grafana/mimir-prometheus/pull/247

These result is lower memory usage by chunk mapper.

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
---
 CHANGELOG.md                                  |   1 +
 go.mod                                        |   2 +-
 go.sum                                        |   4 +-
 .../tsdb/chunks/chunk_write_queue.go          | 108 +++++++++++++--
 .../prometheus/tsdb/chunks/queue.go           | 127 ++++++++++++++++++
 vendor/modules.txt                            |   4 +-
 6 files changed, 227 insertions(+), 19 deletions(-)
 create mode 100644 vendor/github.com/prometheus/prometheus/tsdb/chunks/queue.go

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 79a51159dfb..94b86133191 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -27,6 +27,7 @@
 * [ENHANCEMENT] Blocks Storage, Alertmanager, Ruler: add support a prefix to the bucket store (`*_storage.storage_prefix`). This enables using the same bucket for the three components. #1686 #1951
 * [ENHANCEMENT] Upgrade Docker base images to `alpine:3.16.0`. #2028
 * [ENHANCEMENT] Store-gateway: Add experimental configuration option for the store-gateway to attempt to pre-populate the file system cache when memory-mapping index-header files. Enabled with `-blocks-storage.bucket-store.index-header.map-populate-enabled=true`. #2019
+* [ENHANCEMENT] Chunk Mapper: reduce memory usage of async chunk mapper. #2043
 * [BUGFIX] Fix regexp parsing panic for regexp label matchers with start/end quantifiers. #1883
 * [BUGFIX] Ingester: fixed deceiving error log "failed to update cached shipped blocks after shipper initialisation", occurring for each new tenant in the ingester. #1893
 * [BUGFIX] Ring: fix bug where instances may appear unhealthy in the hash ring web UI even though they are not. #1933
diff --git a/go.mod b/go.mod
index 889656d20f8..4e3338d493e 100644
--- a/go.mod
+++ b/go.mod
@@ -226,7 +226,7 @@ replace git.apache.org/thrift.git => github.com/apache/thrift v0.0.0-20180902110
 replace github.com/bradfitz/gomemcache => github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab
 
 // Using a fork of Prometheus while we work on querysharding to avoid a dependency on the upstream.
-replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20220518151708-ceaa77f14d6a
+replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20220607154228-1e2d2fb2d8ce
 
 // Pin hashicorp depencencies since the Prometheus fork, go mod tries to update them.
 replace github.com/hashicorp/go-immutable-radix => github.com/hashicorp/go-immutable-radix v1.2.0
diff --git a/go.sum b/go.sum
index 5defb3660c8..1fc0f08d963 100644
--- a/go.sum
+++ b/go.sum
@@ -1053,8 +1053,8 @@ github.com/grafana/e2e v0.1.1-0.20220519104354-1db01e4751fe h1:mxrRWDjKtob43xF9n
 github.com/grafana/e2e v0.1.1-0.20220519104354-1db01e4751fe/go.mod h1:+26VJWpczg2OU3D0537acnHSHzhJORpxOs6F+M27tZo=
 github.com/grafana/memberlist v0.3.1-0.20220425183535-6b97a09b7167 h1:PgEQkGHR4YimSCEGT5IoswN9gJKZDVskf+he6UClCLw=
 github.com/grafana/memberlist v0.3.1-0.20220425183535-6b97a09b7167/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE=
-github.com/grafana/mimir-prometheus v0.0.0-20220518151708-ceaa77f14d6a h1:Pkqac/osviA8l3NuNLORELHtRueAtXKZPenfMcdSjKk=
-github.com/grafana/mimir-prometheus v0.0.0-20220518151708-ceaa77f14d6a/go.mod h1:W59JUgfj423JtdkiZLvblAJD4IQeE04y26z0CL7DVKc=
+github.com/grafana/mimir-prometheus v0.0.0-20220607154228-1e2d2fb2d8ce h1:4C+cNC/u97P+ugUpQfpg/PXdPWRG85/u2VYSaqOv2L8=
+github.com/grafana/mimir-prometheus v0.0.0-20220607154228-1e2d2fb2d8ce/go.mod h1:W59JUgfj423JtdkiZLvblAJD4IQeE04y26z0CL7DVKc=
 github.com/grafana/regexp v0.0.0-20220304095617-2e8d9baf4ac2 h1:uirlL/j72L93RhV4+mkWhjv0cov2I0MIgPOG9rMDr1k=
 github.com/grafana/regexp v0.0.0-20220304095617-2e8d9baf4ac2/go.mod h1:M5qHK+eWfAv8VR/265dIuEpL3fNfeC21tXXp9itM24A=
 github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
diff --git a/vendor/github.com/prometheus/prometheus/tsdb/chunks/chunk_write_queue.go b/vendor/github.com/prometheus/prometheus/tsdb/chunks/chunk_write_queue.go
index 628880b4c4a..071c92c85d0 100644
--- a/vendor/github.com/prometheus/prometheus/tsdb/chunks/chunk_write_queue.go
+++ b/vendor/github.com/prometheus/prometheus/tsdb/chunks/chunk_write_queue.go
@@ -16,12 +16,25 @@ package chunks
 import (
 	"errors"
 	"sync"
+	"time"
 
 	"github.com/prometheus/client_golang/prometheus"
 
 	"github.com/prometheus/prometheus/tsdb/chunkenc"
 )
 
+const (
+	// Minimum recorded peak since since the last shrinking of chunkWriteQueue.chunkrefMap to shrink it again.
+	chunkRefMapShrinkThreshold = 1000
+
+	// Minimum interval between shrinking of chunkWriteQueue.chunkRefMap.
+	chunkRefMapMinShrinkInterval = 10 * time.Minute
+
+	// Maximum size of segment used by job queue (number of elements). With chunkWriteJob being 64 bytes,
+	// this will use ~512 KiB for empty queue.
+	maxChunkQueueSegmentSize = 8192
+)
+
 type chunkWriteJob struct {
 	cutFile   bool
 	seriesRef HeadSeriesRef
@@ -36,23 +49,30 @@ type chunkWriteJob struct {
 // Chunks that shall be written get added to the queue, which is consumed asynchronously.
 // Adding jobs to the queue is non-blocking as long as the queue isn't full.
 type chunkWriteQueue struct {
-	jobs chan chunkWriteJob
+	jobs *writeJobQueue
 
-	chunkRefMapMtx sync.RWMutex
-	chunkRefMap    map[ChunkDiskMapperRef]chunkenc.Chunk
+	chunkRefMapMtx        sync.RWMutex
+	chunkRefMap           map[ChunkDiskMapperRef]chunkenc.Chunk
+	chunkRefMapPeakSize   int       // Largest size that chunkRefMap has grown to since the last time we shrank it.
+	chunkRefMapLastShrink time.Time // When the chunkRefMap has been shrunk the last time.
 
-	isRunningMtx sync.Mutex // Protects the isRunning property.
-	isRunning    bool       // Used to prevent that new jobs get added to the queue when the chan is already closed.
+	// isRunningMtx serves two purposes:
+	// 1. It protects isRunning field.
+	// 2. It serializes adding of jobs to the chunkRefMap in addJob() method. If jobs channel is full then addJob() will block
+	// while holding this mutex, which guarantees that chunkRefMap won't ever grow beyond the queue size + 1.
+	isRunningMtx sync.Mutex
+	isRunning    bool // Used to prevent that new jobs get added to the queue when the chan is already closed.
 
 	workerWg sync.WaitGroup
 
 	writeChunk writeChunkF
 
-	// Keeping three separate counters instead of only a single CounterVec to improve the performance of the critical
+	// Keeping separate counters instead of only a single CounterVec to improve the performance of the critical
 	// addJob() method which otherwise would need to perform a WithLabelValues call on the CounterVec.
 	adds      prometheus.Counter
 	gets      prometheus.Counter
 	completed prometheus.Counter
+	shrink    prometheus.Counter
 }
 
 // writeChunkF is a function which writes chunks, it is dynamic to allow mocking in tests.
@@ -67,14 +87,21 @@ func newChunkWriteQueue(reg prometheus.Registerer, size int, writeChunk writeChu
 		[]string{"operation"},
 	)
 
+	segmentSize := size
+	if segmentSize > maxChunkQueueSegmentSize {
+		segmentSize = maxChunkQueueSegmentSize
+	}
+
 	q := &chunkWriteQueue{
-		jobs:        make(chan chunkWriteJob, size),
-		chunkRefMap: make(map[ChunkDiskMapperRef]chunkenc.Chunk, size),
-		writeChunk:  writeChunk,
+		jobs:                  newWriteJobQueue(size, segmentSize),
+		chunkRefMap:           make(map[ChunkDiskMapperRef]chunkenc.Chunk),
+		chunkRefMapLastShrink: time.Now(),
+		writeChunk:            writeChunk,
 
 		adds:      counters.WithLabelValues("add"),
 		gets:      counters.WithLabelValues("get"),
 		completed: counters.WithLabelValues("complete"),
+		shrink:    counters.WithLabelValues("shrink"),
 	}
 
 	if reg != nil {
@@ -90,7 +117,12 @@ func (c *chunkWriteQueue) start() {
 	go func() {
 		defer c.workerWg.Done()
 
-		for job := range c.jobs {
+		for {
+			job, ok := c.jobs.pop()
+			if !ok {
+				return
+			}
+
 			c.processJob(job)
 		}
 	}()
@@ -112,6 +144,42 @@ func (c *chunkWriteQueue) processJob(job chunkWriteJob) {
 	delete(c.chunkRefMap, job.ref)
 
 	c.completed.Inc()
+
+	c.shrinkChunkRefMap()
+}
+
+// shrinkChunkRefMap checks whether the conditions to shrink the chunkRefMap are met,
+// if so chunkRefMap is reinitialized. The chunkRefMapMtx must be held when calling this method.
+//
+// We do this because Go runtime doesn't release internal memory used by map after map has been emptied.
+// To achieve that we create new map instead and throw the old one away.
+func (c *chunkWriteQueue) shrinkChunkRefMap() {
+	if len(c.chunkRefMap) > 0 {
+		// Can't shrink it while there is data in it.
+		return
+	}
+
+	if c.chunkRefMapPeakSize < chunkRefMapShrinkThreshold {
+		// Not shrinking it because it has not grown to the minimum threshold yet.
+		return
+	}
+
+	now := time.Now()
+
+	if now.Sub(c.chunkRefMapLastShrink) < chunkRefMapMinShrinkInterval {
+		// Not shrinking it because the minimum duration between shrink-events has not passed yet.
+		return
+	}
+
+	// Re-initialize the chunk ref map to half of the peak size that it has grown to since the last re-init event.
+	// We are trying to hit the sweet spot in the trade-off between initializing it to a very small size
+	// potentially resulting in many allocations to re-grow it, and initializing it to a large size potentially
+	// resulting in unused allocated memory.
+	c.chunkRefMap = make(map[ChunkDiskMapperRef]chunkenc.Chunk, c.chunkRefMapPeakSize/2)
+
+	c.chunkRefMapPeakSize = 0
+	c.chunkRefMapLastShrink = now
+	c.shrink.Inc()
 }
 
 func (c *chunkWriteQueue) addJob(job chunkWriteJob) (err error) {
@@ -125,14 +193,26 @@ func (c *chunkWriteQueue) addJob(job chunkWriteJob) (err error) {
 	defer c.isRunningMtx.Unlock()
 
 	if !c.isRunning {
-		return errors.New("queue is not started")
+		return errors.New("queue is not running")
 	}
 
 	c.chunkRefMapMtx.Lock()
 	c.chunkRefMap[job.ref] = job.chk
+
+	// Keep track of the peak usage of c.chunkRefMap.
+	if len(c.chunkRefMap) > c.chunkRefMapPeakSize {
+		c.chunkRefMapPeakSize = len(c.chunkRefMap)
+	}
 	c.chunkRefMapMtx.Unlock()
 
-	c.jobs <- job
+	ok := c.jobs.push(job)
+	if !ok {
+		c.chunkRefMapMtx.Lock()
+		delete(c.chunkRefMap, job.ref)
+		c.chunkRefMapMtx.Unlock()
+
+		return errors.New("queue is closed")
+	}
 
 	return nil
 }
@@ -159,7 +239,7 @@ func (c *chunkWriteQueue) stop() {
 
 	c.isRunning = false
 
-	close(c.jobs)
+	c.jobs.close()
 
 	c.workerWg.Wait()
 }
@@ -171,7 +251,7 @@ func (c *chunkWriteQueue) queueIsEmpty() bool {
 func (c *chunkWriteQueue) queueIsFull() bool {
 	// When the queue is full and blocked on the writer the chunkRefMap has one more job than the cap of the jobCh
 	// because one job is currently being processed and blocked in the writer.
-	return c.queueSize() == cap(c.jobs)+1
+	return c.queueSize() == c.jobs.maxSize+1
 }
 
 func (c *chunkWriteQueue) queueSize() int {
diff --git a/vendor/github.com/prometheus/prometheus/tsdb/chunks/queue.go b/vendor/github.com/prometheus/prometheus/tsdb/chunks/queue.go
new file mode 100644
index 00000000000..23b38e7f27b
--- /dev/null
+++ b/vendor/github.com/prometheus/prometheus/tsdb/chunks/queue.go
@@ -0,0 +1,127 @@
+package chunks
+
+import "sync"
+
+// writeJobQueue is similar to buffered channel of chunkWriteJob, but manages its own buffers
+// to avoid using a lot of memory when it's empty. It does that by storing elements into segments
+// of equal size (segmentSize). When segment is not used anymore, reference to it are removed,
+// so it can be treated as a garbage.
+type writeJobQueue struct {
+	maxSize     int
+	segmentSize int
+
+	mtx            sync.Mutex            // protects all following variables
+	pushed, popped *sync.Cond            // signalled when something is pushed into the queue or popped from it
+	first, last    *writeJobQueueSegment // pointer to first and last segment, if any
+	size           int                   // total size of the queue
+	closed         bool                  // after closing the queue, nothing can be pushed to it
+}
+
+type writeJobQueueSegment struct {
+	segment             []chunkWriteJob
+	nextRead, nextWrite int                   // index of next read and next write in this segment.
+	nextSegment         *writeJobQueueSegment // next segment, if any
+}
+
+func newWriteJobQueue(maxSize, segmentSize int) *writeJobQueue {
+	if maxSize <= 0 || segmentSize <= 0 {
+		panic("invalid queue")
+	}
+
+	q := &writeJobQueue{
+		maxSize:     maxSize,
+		segmentSize: segmentSize,
+	}
+
+	q.pushed = sync.NewCond(&q.mtx)
+	q.popped = sync.NewCond(&q.mtx)
+	return q
+}
+
+func (q *writeJobQueue) close() {
+	q.mtx.Lock()
+	defer q.mtx.Unlock()
+
+	q.closed = true
+
+	// unblock all blocked goroutines
+	q.pushed.Broadcast()
+	q.popped.Broadcast()
+}
+
+// push blocks until there is space available in the queue, and then adds job to the queue.
+// If queue is closed or gets closed while waiting for space, push returns false.
+func (q *writeJobQueue) push(job chunkWriteJob) bool {
+	q.mtx.Lock()
+	defer q.mtx.Unlock()
+
+	// wait until queue has more space or is closed
+	for !q.closed && q.size >= q.maxSize {
+		q.popped.Wait()
+	}
+
+	if q.closed {
+		return false
+	}
+
+	// Check if this segment has more space for writing, and create new one if not.
+	if q.last == nil || q.last.nextWrite >= q.segmentSize {
+		prevLast := q.last
+		q.last = &writeJobQueueSegment{
+			segment: make([]chunkWriteJob, q.segmentSize),
+		}
+
+		if prevLast != nil {
+			prevLast.nextSegment = q.last
+		}
+		if q.first == nil {
+			q.first = q.last
+		}
+	}
+
+	q.last.segment[q.last.nextWrite] = job
+	q.last.nextWrite++
+	q.size++
+	q.pushed.Signal()
+	return true
+}
+
+// pop returns first job from the queue, and true.
+// if queue is empty, pop blocks until there is a job (returns true), or until queue is closed (returns false).
+// If queue was already closed, pop first returns all remaining elements from the queue (with true value), and only then returns false.
+func (q *writeJobQueue) pop() (chunkWriteJob, bool) {
+	q.mtx.Lock()
+	defer q.mtx.Unlock()
+
+	// wait until something is pushed to the queue, or queue is closed.
+	for q.size == 0 {
+		if q.closed {
+			return chunkWriteJob{}, false
+		}
+
+		q.pushed.Wait()
+	}
+
+	res := q.first.segment[q.first.nextRead]
+	q.first.segment[q.first.nextRead] = chunkWriteJob{} // clear just-read element
+	q.first.nextRead++
+	q.size--
+
+	// If we have read all possible elements from first segment, we can drop it.
+	if q.first.nextRead >= q.segmentSize {
+		q.first = q.first.nextSegment
+		if q.first == nil {
+			q.last = nil
+		}
+	}
+
+	q.popped.Signal()
+	return res, true
+}
+
+func (q *writeJobQueue) length() int {
+	q.mtx.Lock()
+	defer q.mtx.Unlock()
+
+	return q.size
+}
diff --git a/vendor/modules.txt b/vendor/modules.txt
index 01f5d561375..d26d6ec202e 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -714,7 +714,7 @@ github.com/prometheus/node_exporter/https
 github.com/prometheus/procfs
 github.com/prometheus/procfs/internal/fs
 github.com/prometheus/procfs/internal/util
-# github.com/prometheus/prometheus v1.8.2-0.20220308163432-03831554a519 => github.com/grafana/mimir-prometheus v0.0.0-20220518151708-ceaa77f14d6a
+# github.com/prometheus/prometheus v1.8.2-0.20220308163432-03831554a519 => github.com/grafana/mimir-prometheus v0.0.0-20220607154228-1e2d2fb2d8ce
 ## explicit; go 1.16
 github.com/prometheus/prometheus/config
 github.com/prometheus/prometheus/discovery
@@ -1222,7 +1222,7 @@ gopkg.in/yaml.v2
 gopkg.in/yaml.v3
 # git.apache.org/thrift.git => github.com/apache/thrift v0.0.0-20180902110319-2566ecd5d999
 # github.com/bradfitz/gomemcache => github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab
-# github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20220518151708-ceaa77f14d6a
+# github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20220607154228-1e2d2fb2d8ce
 # github.com/hashicorp/go-immutable-radix => github.com/hashicorp/go-immutable-radix v1.2.0
 # github.com/hashicorp/go-hclog => github.com/hashicorp/go-hclog v0.12.2
 # github.com/hashicorp/memberlist => github.com/grafana/memberlist v0.3.1-0.20220425183535-6b97a09b7167