Skip to content

Commit

Permalink
[Rhythm] Block builder performance improvement (#4596)
Browse files Browse the repository at this point in the history
* block builder pool prealloc byte slices

* Convert livetraces to store bytes to reduce GC scanning, Parallelize trace unmarshal, repool prealloc slices

* changelog
  • Loading branch information
mdisibio authored Jan 23, 2025
1 parent 00b37df commit b980aa7
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 62 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* [ENHANCEMENT] Prevent queries in the ingester from blocking flushing traces to disk and memory spikes. [#4483](https://github.com/grafana/tempo/pull/4483) (@joe-elliott)
* [ENHANCEMENT] Update tempo operational dashboard for new block-builder and v2 traces api [#4559](https://github.com/grafana/tempo/pull/4559) (@mdisibio)
* [ENHANCEMENT] Improve block-builder performance by flushing blocks concurrently [#4565](https://github.com/grafana/tempo/pull/4565) (@mdisibio)
* [ENHANCEMENT] Improve block-builder performance [#4596](https://github.com/grafana/tempo/pull/4596) (@mdisibio)
* [ENHANCEMENT] Export new `tempo_ingest_group_partition_lag` metric from block-builders and metrics-generators [#4571](https://github.com/grafana/tempo/pull/4571) (@mdisibio)
* [ENHANCEMENT] Use distroless base container images for improved security [#4556](https://github.com/grafana/tempo/pull/4556) (@carles-grafana)
* [BUGFIX] Choose a default step for a gRPC streaming query range request if none is provided. [#4546](https://github.com/grafana/tempo/pull/4576) (@joe-elliott)
Expand Down
14 changes: 14 additions & 0 deletions modules/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,15 @@ func (b *BlockBuilder) consumePartition(ctx context.Context, partition int32, ov
startOffset = kgo.NewOffset().AtStart()
}

ends, err := b.kadm.ListEndOffsets(ctx, topic)
if err != nil {
return false, err
}
if err := ends.Error(); err != nil {
return false, err
}
lastPossibleMessage, lastPossibleMessageFound := ends.Lookup(topic, partition)

level.Info(b.logger).Log(
"msg", "consuming partition",
"partition", partition,
Expand Down Expand Up @@ -344,6 +353,11 @@ outer:
}

lastRec = rec

if lastPossibleMessageFound && lastRec.Offset >= lastPossibleMessage.Offset-1 {
// We reached the end so break now and avoid another poll which is expected to be empty.
break outer
}
}
}

Expand Down
3 changes: 0 additions & 3 deletions modules/blockbuilder/blockbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,6 @@ func BenchmarkBlockBuilder(b *testing.B) {
for i := 0; i < b.N; i++ {

var records []*kgo.Record

for i := 0; i < 1000; i++ {
records = append(records, sendReq(b, ctx, client)...)
}
Expand All @@ -520,8 +519,6 @@ func BenchmarkBlockBuilder(b *testing.B) {
size += len(r.Value)
}

b.ResetTimer()

err = bb.consume(ctx)
require.NoError(b, err)

Expand Down
9 changes: 1 addition & 8 deletions modules/blockbuilder/partition_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ package blockbuilder

import (
"context"
"fmt"
"strings"
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gogo/protobuf/proto"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/util"
"github.com/grafana/tempo/tempodb"
Expand Down Expand Up @@ -64,12 +62,7 @@ func (p *writer) pushBytes(ts time.Time, tenant string, req *tempopb.PushBytesRe
}

for j, trace := range req.Traces {
tr := new(tempopb.Trace) // TODO - Pool?
if err := proto.Unmarshal(trace.Slice, tr); err != nil {
return fmt.Errorf("failed to unmarshal trace: %w", err)
}

if err := i.AppendTrace(req.Ids[j], tr, ts); err != nil {
if err := i.AppendTrace(req.Ids[j], trace.Slice, ts); err != nil {
return err
}
}
Expand Down
104 changes: 74 additions & 30 deletions modules/blockbuilder/tenant_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type tenantStore struct {
blocksMtx sync.Mutex
walBlocks []common.WALBlock

liveTraces *livetraces.LiveTraces
liveTraces *livetraces.LiveTraces[[]byte]
traceSizes *tracesizes.Tracker
}

Expand All @@ -73,7 +73,7 @@ func newTenantStore(tenantID string, partitionID, endTimestamp uint64, cfg Block
headBlockMtx: sync.Mutex{},
blocksMtx: sync.Mutex{},
enc: enc,
liveTraces: livetraces.New(),
liveTraces: livetraces.New[[]byte](func(b []byte) uint64 { return uint64(len(b)) }),
traceSizes: tracesizes.New(),
}

Expand Down Expand Up @@ -122,61 +122,105 @@ func (s *tenantStore) resetHeadBlock() error {
return nil
}

func (s *tenantStore) AppendTrace(traceID []byte, tr *tempopb.Trace, ts time.Time) error {
func (s *tenantStore) AppendTrace(traceID []byte, tr []byte, ts time.Time) error {
maxSz := s.overrides.MaxBytesPerTrace(s.tenantID)

for _, b := range tr.ResourceSpans {
if maxSz > 0 && !s.traceSizes.Allow(traceID, b.Size(), maxSz) {
// Record dropped spans due to trace too large
count := 0
if maxSz > 0 && !s.traceSizes.Allow(traceID, len(tr), maxSz) {
// Record dropped spans due to trace too large
// We have to unmarhal to count the number of spans.
// TODO - There might be a better way
t := &tempopb.Trace{}
if err := t.Unmarshal(tr); err != nil {
return err
}
count := 0
for _, b := range t.ResourceSpans {
for _, ss := range b.ScopeSpans {
count += len(ss.Spans)
}
overrides.RecordDiscardedSpans(count, reasonTraceTooLarge, s.tenantID)
continue
}

s.liveTraces.PushWithTimestamp(ts, traceID, b, 0)
overrides.RecordDiscardedSpans(count, reasonTraceTooLarge, s.tenantID)
return nil
}

s.liveTraces.PushWithTimestamp(ts, traceID, tr, 0)

return nil
}

func (s *tenantStore) CutIdle(since time.Time, immediate bool) error {
idle := s.liveTraces.CutIdle(since, immediate)

slices.SortFunc(idle, func(a, b *livetraces.LiveTrace) int {
slices.SortFunc(idle, func(a, b *livetraces.LiveTrace[[]byte]) int {
return bytes.Compare(a.ID, b.ID)
})

for _, e := range idle {
tr := &tempopb.Trace{
ResourceSpans: e.Batches,
}
var (
unmarshalWg = sync.WaitGroup{}
unmarshalErr = atomic.NewError(nil)
unmarshaled = make([]*tempopb.Trace, len(idle))
starts = make([]uint32, len(idle))
ends = make([]uint32, len(idle))
)

// Get trace timestamp bounds
var start, end uint64
for _, b := range tr.ResourceSpans {
for _, ss := range b.ScopeSpans {
for _, s := range ss.Spans {
if start == 0 || s.StartTimeUnixNano < start {
start = s.StartTimeUnixNano
// Unmarshal and process in parallel, each goroutine handles 1/Nth
for i := 0; i < len(idle) && i < flushConcurrency; i++ {
unmarshalWg.Add(1)
go func(i int) {
defer unmarshalWg.Done()

for j := i; j < len(idle); j += flushConcurrency {
tr := new(tempopb.Trace)

for _, b := range idle[j].Batches {
// This unmarshal appends the batches onto the existing tempopb.Trace
// so we don't need to allocate another container temporarily
err := tr.Unmarshal(b)
if err != nil {
unmarshalErr.Store(err)
return
}
if s.EndTimeUnixNano > end {
end = s.EndTimeUnixNano
}

// Get trace timestamp bounds
var start, end uint64
for _, b := range tr.ResourceSpans {
for _, ss := range b.ScopeSpans {
for _, s := range ss.Spans {
if start == 0 || s.StartTimeUnixNano < start {
start = s.StartTimeUnixNano
}
if s.EndTimeUnixNano > end {
end = s.EndTimeUnixNano
}
}
}
}

// Convert from unix nanos to unix seconds
starts[j] = uint32(start / uint64(time.Second))
ends[j] = uint32(end / uint64(time.Second))
unmarshaled[j] = tr
}
}
}(i)
}

// Convert from unix nanos to unix seconds
startSeconds := uint32(start / uint64(time.Second))
endSeconds := uint32(end / uint64(time.Second))
unmarshalWg.Wait()
if err := unmarshalErr.Load(); err != nil {
return err
}

if err := s.headBlock.AppendTrace(e.ID, tr, startSeconds, endSeconds); err != nil {
for i, tr := range unmarshaled {
if err := s.headBlock.AppendTrace(idle[i].ID, tr, starts[i], ends[i]); err != nil {
return err
}
}

// Return prealloc slices to the pool
for _, i := range idle {
tempopb.ReuseByteSlices(i.Batches)
}

err := s.headBlock.Flush()
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions modules/generator/processor/localblocks/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type Processor struct {
flushqueue *flushqueues.PriorityQueue

liveTracesMtx sync.Mutex
liveTraces *livetraces.LiveTraces
liveTraces *livetraces.LiveTraces[*v1.ResourceSpans]
traceSizes *tracesizes.Tracker

writer tempodb.Writer
Expand Down Expand Up @@ -104,7 +104,7 @@ func New(cfg Config, tenant string, wal *wal.WAL, writer tempodb.Writer, overrid
walBlocks: map[uuid.UUID]common.WALBlock{},
completeBlocks: map[uuid.UUID]*ingester.LocalBlock{},
flushqueue: flushqueues.NewPriorityQueue(metricFlushQueueSize.WithLabelValues(tenant)),
liveTraces: livetraces.New(),
liveTraces: livetraces.New[*v1.ResourceSpans](func(rs *v1.ResourceSpans) uint64 { return uint64(rs.Size()) }),
traceSizes: tracesizes.New(),
closeCh: make(chan struct{}),
wg: sync.WaitGroup{},
Expand Down
4 changes: 3 additions & 1 deletion pkg/ingest/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,9 @@ func (d *Decoder) Decode(data []byte) (*tempopb.PushBytesRequest, error) {
}

func (d *Decoder) Reset() {
d.req.Reset()
// Retain slice capacity
d.req.Ids = d.req.Ids[:0]
d.req.Traces = d.req.Traces[:0]
}

// sovPush calculates the size of varint-encoded uint64.
Expand Down
40 changes: 23 additions & 17 deletions pkg/livetraces/livetraces.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,47 +8,53 @@ import (
v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1"
)

type LiveTrace struct {
type LiveTraceBatchT interface {
*v1.ResourceSpans | []byte
}

type LiveTrace[T LiveTraceBatchT] struct {
ID []byte
timestamp time.Time
Batches []*v1.ResourceSpans
Batches []T

sz uint64
}

type LiveTraces struct {
type LiveTraces[T LiveTraceBatchT] struct {
hash hash.Hash64
Traces map[uint64]*LiveTrace
Traces map[uint64]*LiveTrace[T]

sz uint64
sz uint64
szFunc func(T) uint64
}

func New() *LiveTraces {
return &LiveTraces{
func New[T LiveTraceBatchT](sizeFunc func(T) uint64) *LiveTraces[T] {
return &LiveTraces[T]{
hash: fnv.New64(),
Traces: make(map[uint64]*LiveTrace),
Traces: make(map[uint64]*LiveTrace[T]),
szFunc: sizeFunc,
}
}

func (l *LiveTraces) token(traceID []byte) uint64 {
func (l *LiveTraces[T]) token(traceID []byte) uint64 {
l.hash.Reset()
l.hash.Write(traceID)
return l.hash.Sum64()
}

func (l *LiveTraces) Len() uint64 {
func (l *LiveTraces[T]) Len() uint64 {
return uint64(len(l.Traces))
}

func (l *LiveTraces) Size() uint64 {
func (l *LiveTraces[T]) Size() uint64 {
return l.sz
}

func (l *LiveTraces) Push(traceID []byte, batch *v1.ResourceSpans, max uint64) bool {
func (l *LiveTraces[T]) Push(traceID []byte, batch T, max uint64) bool {
return l.PushWithTimestamp(time.Now(), traceID, batch, max)
}

func (l *LiveTraces) PushWithTimestamp(ts time.Time, traceID []byte, batch *v1.ResourceSpans, max uint64) bool {
func (l *LiveTraces[T]) PushWithTimestamp(ts time.Time, traceID []byte, batch T, max uint64) bool {
token := l.token(traceID)

tr := l.Traces[token]
Expand All @@ -60,13 +66,13 @@ func (l *LiveTraces) PushWithTimestamp(ts time.Time, traceID []byte, batch *v1.R
return false
}

tr = &LiveTrace{
tr = &LiveTrace[T]{
ID: traceID,
}
l.Traces[token] = tr
}

sz := uint64(batch.Size())
sz := l.szFunc(batch)
tr.sz += sz
l.sz += sz

Expand All @@ -75,8 +81,8 @@ func (l *LiveTraces) PushWithTimestamp(ts time.Time, traceID []byte, batch *v1.R
return true
}

func (l *LiveTraces) CutIdle(idleSince time.Time, immediate bool) []*LiveTrace {
res := []*LiveTrace{}
func (l *LiveTraces[T]) CutIdle(idleSince time.Time, immediate bool) []*LiveTrace[T] {
res := []*LiveTrace[T]{}

for k, tr := range l.Traces {
if tr.timestamp.Before(idleSince) || immediate {
Expand Down
3 changes: 2 additions & 1 deletion pkg/livetraces/livetraces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import (
"testing"
"time"

v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1"
"github.com/grafana/tempo/pkg/util/test"
"github.com/stretchr/testify/require"
)

func TestLiveTracesSizesAndLen(t *testing.T) {
lt := New()
lt := New[*v1.ResourceSpans](func(rs *v1.ResourceSpans) uint64 { return uint64(rs.Size()) })

expectedSz := uint64(0)
expectedLen := uint64(0)
Expand Down

0 comments on commit b980aa7

Please sign in to comment.