Skip to content

Commit

Permalink
[Rhythm] Add concurrency to block-builder wal conversion and flushing (
Browse files Browse the repository at this point in the history
…#4565)

* Add concurrency to block-builder wal conversion and flushing

* changelog

* lint
  • Loading branch information
mdisibio authored Jan 16, 2025
1 parent 7cda43b commit 0eae105
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 30 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

* [CHANGE] **BREAKING CHANGE** Removed `internal_error` as a reason from `tempo_discarded_spans_total`. [#4554](https://github.com/grafana/tempo/pull/4554) (@joe-elliott)
* [ENHANCEMENT] Update tempo operational dashboard for new block-builder and v2 traces api [#4559](https://github.com/grafana/tempo/pull/4559) (@mdisibio)
* [ENHANCEMENT] Improve block-builder performance by flushing blocks concurrently [#4565](https://github.com/grafana/tempo/pull/4565) (@mdisibio)
* [BUGFIX] Choose a default step for a gRPC streaming query range request if none is provided. [#4546](https://github.com/grafana/tempo/pull/4546) (@joe-elliott)
Fix an issue where the tempo-cli was not correctly dumping exemplar results.
* [BUGFIX] Fix performance bottleneck and file cleanup in block builder [#4550](https://github.com/grafana/tempo/pull/4550) (@mdisibio)
Expand Down
24 changes: 16 additions & 8 deletions modules/blockbuilder/blockbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/go-kit/log"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
Expand Down Expand Up @@ -328,7 +329,12 @@ type ownEverythingSharder struct{}
func (o *ownEverythingSharder) Owns(string) bool { return true }

func newStore(ctx context.Context, t testing.TB) storage.Store {
return newStoreWithLogger(ctx, t, test.NewTestingLogger(t))
}

func newStoreWithLogger(ctx context.Context, t testing.TB, log log.Logger) storage.Store {
tmpDir := t.TempDir()

s, err := storage.NewStore(storage.Config{
Trace: tempodb.Config{
Backend: backend.Local,
Expand All @@ -348,7 +354,7 @@ func newStore(ctx context.Context, t testing.TB) storage.Store {
},
BlocklistPoll: 5 * time.Second,
},
}, nil, test.NewTestingLogger(t))
}, nil, log)
require.NoError(t, err)

s.EnablePolling(ctx, &ownEverythingSharder{})
Expand Down Expand Up @@ -483,14 +489,22 @@ func requireLastCommitEquals(t testing.TB, ctx context.Context, client *kgo.Clie
func BenchmarkBlockBuilder(b *testing.B) {
var (
ctx = context.Background()
logger = log.NewNopLogger()
_, address = testkafka.CreateCluster(b, 1, testTopic)
store = newStore(ctx, b)
store = newStoreWithLogger(ctx, b, logger)
cfg = blockbuilderConfig(b, address)
client = newKafkaClient(b, cfg.IngestStorageConfig.Kafka)
)

cfg.ConsumeCycleDuration = 1 * time.Hour

bb := New(cfg, logger, newPartitionRingReader(), &mockOverrides{}, store)
defer func() { require.NoError(b, bb.stopping(nil)) }()

// Startup (without starting the background consume cycle)
err := bb.starting(ctx)
require.NoError(b, err)

b.ResetTimer()

for i := 0; i < b.N; i++ {
Expand All @@ -508,12 +522,6 @@ func BenchmarkBlockBuilder(b *testing.B) {

b.ResetTimer()

bb := New(cfg, test.NewTestingLogger(b), newPartitionRingReader(), &mockOverrides{}, store)

// Startup (without starting the background consume cycle)
err := bb.starting(ctx)
require.NoError(b, err)

err = bb.consume(ctx)
require.NoError(b, err)

Expand Down
77 changes: 55 additions & 22 deletions modules/blockbuilder/tenant_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/google/uuid"
"github.com/grafana/tempo/modules/blockbuilder/util"
"github.com/grafana/tempo/modules/overrides"
"github.com/grafana/tempo/pkg/boundedwaitgroup"
"github.com/grafana/tempo/pkg/livetraces"
"github.com/grafana/tempo/pkg/model"
"github.com/grafana/tempo/pkg/tempopb"
Expand All @@ -23,6 +24,7 @@ import (
"github.com/grafana/tempo/tempodb/wal"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.uber.org/atomic"
)

var metricBlockBuilderFlushedBlocks = promauto.NewCounterVec(
Expand All @@ -33,7 +35,10 @@ var metricBlockBuilderFlushedBlocks = promauto.NewCounterVec(
}, []string{"tenant"},
)

const reasonTraceTooLarge = "trace_too_large"
const (
reasonTraceTooLarge = "trace_too_large"
flushConcurrency = 4
)

// TODO - This needs locking
type tenantStore struct {
Expand Down Expand Up @@ -192,34 +197,62 @@ func (s *tenantStore) Flush(ctx context.Context, store tempodb.Writer) error {
s.blocksMtx.Lock()
defer s.blocksMtx.Unlock()

completeBlocks := make([]tempodb.WriteableBlock, 0, len(s.walBlocks))
// Write all blocks
for _, block := range s.walBlocks {
completeBlock, err := s.buildWriteableBlock(ctx, block)
if err != nil {
return err
}
var (
completeBlocks = make([]tempodb.WriteableBlock, len(s.walBlocks))
jobErr = atomic.NewError(nil)
wg = boundedwaitgroup.New(flushConcurrency)
)

// Convert WALs to backend blocks
for i, block := range s.walBlocks {
wg.Add(1)
go func(i int, block common.WALBlock) {
defer wg.Done()

completeBlock, err := s.buildWriteableBlock(ctx, block)
if err != nil {
jobErr.Store(err)
return
}

err = block.Clear()
if err != nil {
return err
}
err = block.Clear()
if err != nil {
jobErr.Store(err)
return
}

completeBlocks = append(completeBlocks, completeBlock)
completeBlocks[i] = completeBlock
}(i, block)
}

wg.Wait()
if err := jobErr.Load(); err != nil {
return err
}

level.Info(s.logger).Log("msg", "writing blocks to storage", "num_blocks", len(completeBlocks))
// Write all blocks to the store
level.Info(s.logger).Log("msg", "writing blocks to storage", "num_blocks", len(completeBlocks))
for _, block := range completeBlocks {
level.Info(s.logger).Log("msg", "writing block to storage", "block_id", block.BlockMeta().BlockID.String())
if err := store.WriteBlock(ctx, block); err != nil {
return err
}
metricBlockBuilderFlushedBlocks.WithLabelValues(s.tenantID).Inc()
wg.Add(1)
go func(block tempodb.WriteableBlock) {
defer wg.Done()
level.Info(s.logger).Log("msg", "writing block to storage", "block_id", block.BlockMeta().BlockID.String())
if err := store.WriteBlock(ctx, block); err != nil {
jobErr.Store(err)
return
}

if err := s.wal.LocalBackend().ClearBlock((uuid.UUID)(block.BlockMeta().BlockID), s.tenantID); err != nil {
return err
}
metricBlockBuilderFlushedBlocks.WithLabelValues(s.tenantID).Inc()

if err := s.wal.LocalBackend().ClearBlock((uuid.UUID)(block.BlockMeta().BlockID), s.tenantID); err != nil {
jobErr.Store(err)
}
}(block)
}

wg.Wait()
if err := jobErr.Load(); err != nil {
return err
}

// Clear the blocks
Expand Down

0 comments on commit 0eae105

Please sign in to comment.