Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingester/Generator Live trace cleanup #4365

Merged
merged 11 commits into from
Nov 22, 2024
7 changes: 5 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@
* [ENHANCEMENT] Reduce ingester working set by improving prelloc behavior. [#4344](https://github.com/grafana/tempo/pull/4344) (@joe-elliott)
* [ENHANCEMENT] Use Promtheus fast regexp for TraceQL regular expression matchers. [#4329](https://github.com/grafana/tempo/pull/4329) (@joe-elliott)
**BREAKING CHANGE** All regular expression matchers will now be fully anchored. `span.foo =~ "bar"` will now be evaluated as `span.foo =~ "^bar$"`
* [ENHANCEMENT] Reuse generator code to better refuse "too large" traces. [#4365](https://github.com/grafana/tempo/pull/4365) (@joe-elliott)
This will cause the ingester to more aggressively and correctly refuse traces. Also added two metrics to better track bytes consumed per tenant in the ingester.
`tempo_metrics_generator_live_trace_bytes` and `tempo_ingester_live_trace_bytes`.
* [BUGFIX] Replace hedged requests roundtrips total with a counter. [#4063](https://github.com/grafana/tempo/pull/4063) [#4078](https://github.com/grafana/tempo/pull/4078) (@galalen)
* [BUGFIX] Metrics generators: Correctly drop from the ring before stopping ingestion to reduce drops during a rollout. [#4101](https://github.com/grafana/tempo/pull/4101) (@joe-elliott)
* [BUGFIX] Correctly handle 400 Bad Request and 404 Not Found in gRPC streaming [#4144](https://github.com/grafana/tempo/pull/4144) (@mapno)
Expand All @@ -71,9 +74,9 @@
* [BUGFIX] Fix several issues with exemplar values for traceql metrics [#4366](https://github.com/grafana/tempo/pull/4366) (@mdisibio)
* [BUGFIX] Skip computing exemplars for instant queries. [#4204](https://github.com/grafana/tempo/pull/4204) (@javiermolinar)
* [BUGFIX] Gave context to orphaned spans related to various maintenance processes. [#4260](https://github.com/grafana/tempo/pull/4260) (@joe-elliott)
* [BUGFIX] Utilize S3Pass and S3User parameters in tempo-cli options, which were previously unused in the code. [#44236](https://github.com/grafana/tempo/pull/4259) (@faridtmammadov)
* [BUGFIX] Initialize histogram buckets to 0 to avoid downsampling. [#4366](https://github.com/grafana/tempo/pull/4366) (@javiermolinar)

* [BUGFIX] Utilize S3Pass and S3User parameters in tempo-cli options, which were previously unused in the code. [#4259](https://github.com/grafana/tempo/pull/4259) (@faridtmammadov)
* [BUGFIX] Fixed an issue in the generator where the first batch was counted 2x against a traces size. [#4365](https://github.com/grafana/tempo/pull/4365) (@joe-elliott)

# v2.6.1

Expand Down
3 changes: 3 additions & 0 deletions integration/e2e/limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ func TestQueryLimits(t *testing.T) {
batch.Spans = allSpans[i : i+1]
require.NoError(t, c.EmitBatch(context.Background(), batch))
util.CallFlush(t, tempo)
// this push along with the double flush is required to forget the too large trace
require.NoError(t, c.EmitBatch(context.Background(), util.MakeThriftBatchWithSpanCount(1)))
util.CallFlush(t, tempo)
time.Sleep(2 * time.Second) // trace idle and flush time are both 1ms
}

Expand Down
13 changes: 13 additions & 0 deletions modules/generator/processor/localblocks/livetraces.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@ type liveTrace struct {
id []byte
timestamp time.Time
Batches []*v1.ResourceSpans

sz uint64
}

type liveTraces struct {
hash hash.Hash64
traces map[uint64]*liveTrace

sz uint64
}

func newLiveTraces() *liveTraces {
Expand All @@ -36,6 +40,10 @@ func (l *liveTraces) Len() uint64 {
return uint64(len(l.traces))
}

func (l *liveTraces) Size() uint64 {
return l.sz
}

func (l *liveTraces) Push(traceID []byte, batch *v1.ResourceSpans, max uint64) bool {
token := l.token(traceID)

Expand All @@ -54,6 +62,10 @@ func (l *liveTraces) Push(traceID []byte, batch *v1.ResourceSpans, max uint64) b
l.traces[token] = tr
}

sz := uint64(batch.Size())
tr.sz += sz
l.sz += sz

tr.Batches = append(tr.Batches, batch)
tr.timestamp = time.Now()
return true
Expand All @@ -65,6 +77,7 @@ func (l *liveTraces) CutIdle(idleSince time.Time, immediate bool) []*liveTrace {
for k, tr := range l.traces {
if tr.timestamp.Before(idleSince) || immediate {
res = append(res, tr)
l.sz -= tr.sz
delete(l.traces, k)
}
}
Expand Down
46 changes: 46 additions & 0 deletions modules/generator/processor/localblocks/livetraces_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package localblocks

import (
"math/rand/v2"
"testing"
"time"

"github.com/grafana/tempo/pkg/util/test"
"github.com/stretchr/testify/require"
)

func TestLiveTracesSizesAndLen(t *testing.T) {
lt := newLiveTraces()

expectedSz := uint64(0)
expectedLen := uint64(0)

for i := 0; i < 100; i++ {
id := test.ValidTraceID(nil)
tr := test.MakeTrace(rand.IntN(5)+1, id)

cutTime := time.Now()

// add some traces and confirm size/len
expectedLen++
for _, rs := range tr.ResourceSpans {
expectedSz += uint64(rs.Size())
lt.Push(id, rs, 0)
}

require.Equal(t, expectedSz, lt.Size())
require.Equal(t, expectedLen, lt.Len())

// cut some traces and confirm size/len
cutTraces := lt.CutIdle(cutTime, false)
for _, tr := range cutTraces {
for _, rs := range tr.Batches {
expectedSz -= uint64(rs.Size())
}
expectedLen--
}

require.Equal(t, expectedSz, lt.Size())
require.Equal(t, expectedLen, lt.Len())
}
}
6 changes: 6 additions & 0 deletions modules/generator/processor/localblocks/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ var (
Name: "live_traces",
Help: "Number of live traces",
}, []string{"tenant"})
metricLiveTraceBytes = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "live_trace_bytes",
Help: "Total number of traces created",
}, []string{"tenant"})
metricDroppedTraces = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Expand Down
6 changes: 4 additions & 2 deletions modules/generator/processor/localblocks/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/google/uuid"
"github.com/grafana/tempo/modules/ingester"
"github.com/grafana/tempo/pkg/flushqueues"
"github.com/grafana/tempo/pkg/tracesizes"
"github.com/grafana/tempo/tempodb"
"go.opentelemetry.io/otel"

Expand Down Expand Up @@ -70,7 +71,7 @@ type Processor struct {

liveTracesMtx sync.Mutex
liveTraces *liveTraces
traceSizes *traceSizes
traceSizes *tracesizes.Tracker

writer tempodb.Writer
}
Expand Down Expand Up @@ -103,7 +104,7 @@ func New(cfg Config, tenant string, wal *wal.WAL, writer tempodb.Writer, overrid
completeBlocks: map[uuid.UUID]*ingester.LocalBlock{},
flushqueue: flushqueues.NewPriorityQueue(metricFlushQueueSize.WithLabelValues(tenant)),
liveTraces: newLiveTraces(),
traceSizes: newTraceSizes(),
traceSizes: tracesizes.New(),
closeCh: make(chan struct{}),
wg: sync.WaitGroup{},
cache: lru.New(100),
Expand Down Expand Up @@ -597,6 +598,7 @@ func (p *Processor) cutIdleTraces(immediate bool) error {

// Record live traces before flushing so we know the high water mark
metricLiveTraces.WithLabelValues(p.tenant).Set(float64(len(p.liveTraces.traces)))
metricLiveTraceBytes.WithLabelValues(p.tenant).Set(float64(p.liveTraces.Size()))

since := time.Now().Add(-p.Cfg.TraceIdlePeriod)
tracesToCut := p.liveTraces.CutIdle(since, immediate)
Expand Down
65 changes: 32 additions & 33 deletions modules/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ import (
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.uber.org/atomic"
"google.golang.org/grpc/codes"

"github.com/grafana/tempo/modules/overrides"
"github.com/grafana/tempo/pkg/model"
"github.com/grafana/tempo/pkg/model/trace"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/tracesizes"
"github.com/grafana/tempo/pkg/util/log"
"github.com/grafana/tempo/pkg/validation"
"github.com/grafana/tempo/tempodb"
Expand Down Expand Up @@ -64,6 +64,11 @@ var (
Name: "ingester_live_traces",
Help: "The current number of lives traces per tenant.",
}, []string{"tenant"})
metricLiveTraceBytes = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "tempo",
Name: "ingester_live_trace_bytes",
Help: "The current number of bytes consumed by lives traces per tenant.",
}, []string{"tenant"})
metricBlocksClearedTotal = promauto.NewCounter(prometheus.CounterOpts{
Namespace: "tempo",
Name: "ingester_blocks_cleared_total",
Expand All @@ -82,10 +87,10 @@ var (
)

type instance struct {
tracesMtx sync.Mutex
traces map[uint32]*liveTrace
traceSizes map[uint32]uint32
traceCount atomic.Int32
tracesMtx sync.Mutex
traces map[uint32]*liveTrace
traceSizes *tracesizes.Tracker
traceSizeBytes uint64
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved

headBlockMtx sync.RWMutex
headBlock common.WALBlock
Expand Down Expand Up @@ -115,7 +120,7 @@ type instance struct {
func newInstance(instanceID string, limiter *Limiter, overrides ingesterOverrides, writer tempodb.Writer, l *local.Backend, dedicatedColumns backend.DedicatedColumns) (*instance, error) {
i := &instance{
traces: map[uint32]*liveTrace{},
traceSizes: map[uint32]uint32{},
traceSizes: tracesizes.New(),

instanceID: instanceID,
tracesCreatedTotal: metricTracesCreatedTotal.WithLabelValues(instanceID),
Expand Down Expand Up @@ -190,40 +195,34 @@ func (i *instance) PushBytes(ctx context.Context, id, traceBytes []byte) error {
return status.Errorf(codes.InvalidArgument, "%s is not a valid traceid", hex.EncodeToString(id))
}

// check for max traces before grabbing the lock to better load shed
err := i.limiter.AssertMaxTracesPerUser(i.instanceID, int(i.traceCount.Load()))
if err != nil {
return newMaxLiveTracesError(i.instanceID, err.Error())
}

return i.push(ctx, id, traceBytes)
}

func (i *instance) push(ctx context.Context, id, traceBytes []byte) error {
i.tracesMtx.Lock()
defer i.tracesMtx.Unlock()

tkn := i.tokenForTraceID(id)
err := i.limiter.AssertMaxTracesPerUser(i.instanceID, len(i.traces))
if err != nil {
return newMaxLiveTracesError(i.instanceID, err.Error())
}

maxBytes := i.limiter.limits.MaxBytesPerTrace(i.instanceID)
reqSize := len(traceBytes)

if maxBytes > 0 {
prevSize := int(i.traceSizes[tkn])
reqSize := len(traceBytes)
if prevSize+reqSize > maxBytes {
return newTraceTooLargeError(id, i.instanceID, maxBytes, reqSize)
}
if maxBytes > 0 && !i.traceSizes.Allow(id, reqSize, maxBytes) {
return newTraceTooLargeError(id, i.instanceID, maxBytes, reqSize)
}

trace := i.getOrCreateTrace(id, tkn, maxBytes)
tkn := i.tokenForTraceID(id)
trace := i.getOrCreateTrace(id, tkn)

err := trace.Push(ctx, i.instanceID, traceBytes)
err = trace.Push(ctx, i.instanceID, traceBytes)
if err != nil {
return err
}

if maxBytes > 0 {
i.traceSizes[tkn] += uint32(len(traceBytes))
}
i.traceSizeBytes += uint64(reqSize)

return nil
}
Expand Down Expand Up @@ -281,6 +280,8 @@ func (i *instance) CutBlockIfReady(maxBlockLifetime time.Duration, maxBlockBytes

now := time.Now()
if i.lastBlockCut.Add(maxBlockLifetime).Before(now) || i.headBlock.DataLength() >= maxBlockBytes || immediate {
// Reset trace sizes when cutting block
i.traceSizes.ClearIdle(i.lastBlockCut)

// Final flush
err := i.headBlock.Flush()
Expand Down Expand Up @@ -485,15 +486,14 @@ func (i *instance) AddCompletingBlock(b common.WALBlock) {
// getOrCreateTrace will return a new trace object for the given request
//
// It must be called under the i.tracesMtx lock
func (i *instance) getOrCreateTrace(traceID []byte, fp uint32, maxBytes int) *liveTrace {
func (i *instance) getOrCreateTrace(traceID []byte, fp uint32) *liveTrace {
trace, ok := i.traces[fp]
if ok {
return trace
}

trace = newTrace(traceID, maxBytes)
trace = newTrace(traceID)
i.traces[fp] = trace
i.traceCount.Inc()

return trace
}
Expand All @@ -507,11 +507,6 @@ func (i *instance) tokenForTraceID(id []byte) uint32 {

// resetHeadBlock() should be called under lock
func (i *instance) resetHeadBlock() error {
// Reset trace sizes when cutting block
i.tracesMtx.Lock()
i.traceSizes = make(map[uint32]uint32, len(i.traceSizes))
i.tracesMtx.Unlock()

dedicatedColumns := i.getDedicatedColumns()

meta := &backend.BlockMeta{
Expand Down Expand Up @@ -549,17 +544,21 @@ func (i *instance) tracesToCut(cutoff time.Duration, immediate bool) []*liveTrac

// Set this before cutting to give a more accurate number.
metricLiveTraces.WithLabelValues(i.instanceID).Set(float64(len(i.traces)))
metricLiveTraceBytes.WithLabelValues(i.instanceID).Set(float64(i.traceSizeBytes))

cutoffTime := time.Now().Add(cutoff)
tracesToCut := make([]*liveTrace, 0, len(i.traces))

for key, trace := range i.traces {
if cutoffTime.After(trace.lastAppend) || immediate {
tracesToCut = append(tracesToCut, trace)

// decrease live trace bytes
i.traceSizeBytes -= trace.Size()

delete(i.traces, key)
}
}
i.traceCount.Store(int32(len(i.traces)))

return tracesToCut
}
Expand Down
5 changes: 0 additions & 5 deletions modules/ingester/instance_search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ func TestInstanceSearchTraceQL(t *testing.T) {

// Test after appending to WAL
require.NoError(t, i.CutCompleteTraces(0, true))
assert.Equal(t, int(i.traceCount.Load()), len(i.traces))

sr, err = i.Search(context.Background(), req)
assert.NoError(t, err)
Expand Down Expand Up @@ -597,8 +596,6 @@ func writeTracesForSearch(t *testing.T, i *instance, spanName, tagKey, tagValue
// searchData will be nil if not
err = i.PushBytes(context.Background(), id, traceBytes)
require.NoError(t, err)

assert.Equal(t, int(i.traceCount.Load()), len(i.traces))
}

// traces have to be cut to show up in searches
Expand Down Expand Up @@ -805,8 +802,6 @@ func TestInstanceSearchMetrics(t *testing.T) {

err = i.PushBytes(context.Background(), id, traceBytes)
require.NoError(t, err)

assert.Equal(t, int(i.traceCount.Load()), len(i.traces))
}

search := func() *tempopb.SearchMetrics {
Expand Down
Loading