Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka replay speed: adjust batchingQueueCapacity #9344

Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions pkg/storage/ingest/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@ import (
"github.com/grafana/mimir/pkg/util/spanlogger"
)

const shardForSeriesBuffer = 2000 // TODO dimitarvdimitrov 2000 is arbitrary; the idea is that we don't block the goroutine calling PushToStorage while we're flushing. A linked list with a sync.Cond or something different would also work
// batchingQueueCapacity controls how many batches can be enqueued for flushing.
// We don't want to push any batches in parallel and instead want to prepare the next one while the current one finishes, hence the buffer of 1.
// For example, if we flush 1 batch/sec, then batching 2 batches/sec doesn't make us faster.
// This is our initial assumption, and there's potential in testing with higher numbers if there's a high variability in flush times - assuming we can preserve the order of the batches. For now, we'll stick to 1.
dimitarvdimitrov marked this conversation as resolved.
Show resolved Hide resolved
// If there's high variability in the time to flush or in the time to batch, then this buffer might need to be increased.
const batchingQueueCapacity = 5

type Pusher interface {
PushToStorage(context.Context, *mimirpb.WriteRequest) error
Expand Down Expand Up @@ -332,7 +337,7 @@ func (c parallelStoragePusher) shardsFor(userID string) *parallelStorageShards {
}
// Use the same hashing function that's used for stripes in the TSDB. That way we make use of the low-contention property of stripes.
hashLabels := labels.Labels.Hash
p := newParallelStorageShards(c.metrics.numTimeSeriesPerFlush, c.numShards, c.batchSize, shardForSeriesBuffer, c.upstreamPusher, hashLabels)
p := newParallelStorageShards(c.metrics.numTimeSeriesPerFlush, c.numShards, c.batchSize, batchingQueueCapacity, c.upstreamPusher, hashLabels)
c.pushers[userID] = p
return p
}
Expand Down Expand Up @@ -462,7 +467,7 @@ type batchingQueue struct {
func newBatchingQueue(capacity int, batchSize int) *batchingQueue {
return &batchingQueue{
ch: make(chan flushableWriteRequest, capacity),
errCh: make(chan error, capacity),
errCh: make(chan error, capacity+1), // We check errs before pushing to the channel, so we need to have a buffer of at least capacity+1 so that the consumer can push all of its errors and not rely on the producer to unblock it.
done: make(chan struct{}),
currentBatch: flushableWriteRequest{WriteRequest: &mimirpb.WriteRequest{Timeseries: mimirpb.PreallocTimeseriesSliceFromPool()}},
batchSize: batchSize,
Expand Down
39 changes: 39 additions & 0 deletions pkg/storage/ingest/pusher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"fmt"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -638,6 +639,44 @@ func TestParallelStorageShards_ShardWriteRequest(t *testing.T) {
}
}

func TestBatchingQueue_NoDeadlock(t *testing.T) {
capacity := 2
batchSize := 3
queue := newBatchingQueue(capacity, batchSize)

ctx := context.Background()
series := mockPreallocTimeseries("series_1")

// Start a goroutine to process the queue
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
defer queue.Done()
for range queue.Channel() {
// Simulate processing time
time.Sleep(50 * time.Millisecond)
queue.ErrorChannel() <- fmt.Errorf("mock error")
}
}()

// Add items to the queue
for i := 0; i < batchSize*(capacity+1); i++ {
require.NoError(t, queue.AddToBatch(ctx, series))
}

// Close the queue to signal no more items will be added
err := queue.Close()
require.ErrorContains(t, err, "mock error")

wg.Wait()

// Ensure the queue is empty and no deadlock occurred
require.Len(t, queue.ch, 0)
dimitarvdimitrov marked this conversation as resolved.
Show resolved Hide resolved
require.Len(t, queue.errCh, 0)
require.Len(t, queue.currentBatch.Timeseries, 0)
}

func TestBatchingQueue(t *testing.T) {
capacity := 5
batchSize := 3
Expand Down
Loading