diff --git a/CHANGELOG.md b/CHANGELOG.md index a3fbb467f8f..2698a27358e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -68,6 +68,7 @@ * [BUGFIX] Pushes a 0 to classic histogram's counter when the series is new to allow Prometheus to start from a non-null value. [#4140](https://github.com/grafana/tempo/pull/4140) (@mapno) * [BUGFIX] Fix counter samples being downsampled by backdate to the previous minute the initial sample when the series is new [#4236](https://github.com/grafana/tempo/pull/4236) (@javiermolinar) * [BUGFIX] Fix traceql metrics time range handling at the cutoff between recent and backend data [#4257](https://github.com/grafana/tempo/issues/4257) (@mdisibio) +* [BUGFIX] Fix several issues with exemplar values for traceql metrics [#4366](https://github.com/grafana/tempo/pull/4366) (@mdisibio) * [BUGFIX] Skip computing exemplars for instant queries. [#4204](https://github.com/grafana/tempo/pull/4204) (@javiermolinar) * [BUGFIX] Gave context to orphaned spans related to various maintenance processes. [#4260](https://github.com/grafana/tempo/pull/4260) (@joe-elliott) * [BUGFIX] Utilize S3Pass and S3User parameters in tempo-cli options, which were previously unused in the code. [#4259](https://github.com/grafana/tempo/pull/4259) (@faridtmammadov) diff --git a/pkg/traceql/ast.go b/pkg/traceql/ast.go index bb5a767e3c4..b743260920c 100644 --- a/pkg/traceql/ast.go +++ b/pkg/traceql/ast.go @@ -1106,33 +1106,23 @@ func (a *MetricsAggregate) init(q *tempopb.QueryRangeRequest, mode AggregateMode var innerAgg func() VectorAggregator var byFunc func(Span) (Static, bool) var byFuncLabel string - var exemplarFn getExemplar switch a.op { case metricsAggregateCountOverTime: innerAgg = func() VectorAggregator { return NewCountOverTimeAggregator() } a.simpleAggregationOp = sumAggregation - exemplarFn = func(s Span) (float64, uint64) { - return math.NaN(), a.spanStartTimeMs(s) - } + case metricsAggregateMinOverTime: innerAgg = func() VectorAggregator { return NewOverTimeAggregator(a.attr, minAggregation) } a.simpleAggregationOp = minAggregation - exemplarFn = func(s Span) (float64, uint64) { - return math.NaN(), a.spanStartTimeMs(s) - } + case metricsAggregateMaxOverTime: innerAgg = func() VectorAggregator { return NewOverTimeAggregator(a.attr, maxAggregation) } a.simpleAggregationOp = maxAggregation - exemplarFn = func(s Span) (float64, uint64) { - return math.NaN(), a.spanStartTimeMs(s) - } + case metricsAggregateRate: innerAgg = func() VectorAggregator { return NewRateAggregator(1.0 / time.Duration(q.Step).Seconds()) } a.simpleAggregationOp = sumAggregation - exemplarFn = func(s Span) (float64, uint64) { - return math.NaN(), a.spanStartTimeMs(s) - } case metricsAggregateHistogramOverTime, metricsAggregateQuantileOverTime: // Histograms and quantiles are implemented as count_over_time() by(2^log2(attr)) for now @@ -1144,16 +1134,9 @@ func (a *MetricsAggregate) init(q *tempopb.QueryRangeRequest, mode AggregateMode case IntrinsicDurationAttribute: // Optimal implementation for duration attribute byFunc = a.bucketizeSpanDuration - exemplarFn = func(s Span) (float64, uint64) { - return float64(s.DurationNanos()), a.spanStartTimeMs(s) - } default: // Basic implementation for all other attributes byFunc = a.bucketizeAttribute - exemplarFn = func(s Span) (float64, uint64) { - v, _ := FloatizeAttribute(s, a.attr) - return v, a.spanStartTimeMs(s) - } } } @@ -1170,11 +1153,7 @@ func (a *MetricsAggregate) init(q *tempopb.QueryRangeRequest, mode AggregateMode a.agg = NewGroupingAggregator(a.op.String(), func() RangeAggregator { return NewStepAggregator(q.Start, q.End, q.Step, innerAgg) }, a.by, byFunc, byFuncLabel) - a.exemplarFn = exemplarFn -} - -func (a *MetricsAggregate) spanStartTimeMs(s Span) uint64 { - return s.StartTimeUnixNanos() / uint64(time.Millisecond) + a.exemplarFn = exemplarFnFor(a.attr) } func (a *MetricsAggregate) bucketizeSpanDuration(s Span) (Static, bool) { @@ -1209,6 +1188,39 @@ func (a *MetricsAggregate) bucketizeAttribute(s Span) (Static, bool) { } } +func exemplarFnFor(a Attribute) func(Span) (float64, uint64) { + switch a { + case IntrinsicDurationAttribute: + return exemplarDuration + case Attribute{}: + // This records exemplars without a value, and they + // are attached to the series at the end. + return exemplarNaN + default: + return exemplarAttribute(a) + } +} + +func exemplarNaN(s Span) (float64, uint64) { + return math.NaN(), s.StartTimeUnixNanos() / uint64(time.Millisecond) +} + +func exemplarDuration(s Span) (float64, uint64) { + v := float64(s.DurationNanos()) / float64(time.Second) + t := s.StartTimeUnixNanos() / uint64(time.Millisecond) + return v, t +} + +// exemplarAttribute captures a closure around the attribute so it doesn't have to be passed along with every span. +// should be more efficient. +func exemplarAttribute(a Attribute) func(Span) (float64, uint64) { + return func(s Span) (float64, uint64) { + v, _ := FloatizeAttribute(s, a) + t := s.StartTimeUnixNanos() / uint64(time.Millisecond) + return v, t + } +} + func (a *MetricsAggregate) initSum(q *tempopb.QueryRangeRequest) { // Currently all metrics are summed by job to produce // intermediate results. This will change when adding min/max/topk/etc diff --git a/pkg/traceql/engine_metrics.go b/pkg/traceql/engine_metrics.go index f997aac8112..94148cd3726 100644 --- a/pkg/traceql/engine_metrics.go +++ b/pkg/traceql/engine_metrics.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "math" + "math/rand" "sort" "sync" "time" @@ -1053,7 +1054,7 @@ func (e *MetricsEvalulator) Do(ctx context.Context, f SpansetFetcher, fetcherSta } if len(ss.Spans) > 0 && e.sampleExemplar(ss.TraceID) { - e.metricsPipeline.observeExemplar(ss.Spans[0]) // Randomly sample the first span + e.metricsPipeline.observeExemplar(ss.Spans[rand.Intn(len(ss.Spans))]) } e.mtx.Unlock() @@ -1232,19 +1233,25 @@ func (b *SimpleAggregator) aggregateExemplars(ts *tempopb.TimeSeries, existing * Value: StaticFromAnyValue(l.Value), }) } - value := exemplar.Value - if math.IsNaN(value) { - value = 0 // TODO: Use the value of the series at the same timestamp - } existing.Exemplars = append(existing.Exemplars, Exemplar{ Labels: labels, - Value: value, + Value: exemplar.Value, TimestampMs: uint64(exemplar.TimestampMs), }) } } func (b *SimpleAggregator) Results() SeriesSet { + // Attach placeholder exemplars to the output + for _, ts := range b.ss { + for i, e := range ts.Exemplars { + if math.IsNaN(e.Value) { + interval := IntervalOfMs(int64(e.TimestampMs), b.start, b.end, b.step) + ts.Exemplars[i].Value = ts.Values[interval] + } + } + } + return b.ss } diff --git a/pkg/traceql/engine_metrics_average.go b/pkg/traceql/engine_metrics_average.go index 0b6d934a7cd..754e3fd6ac9 100644 --- a/pkg/traceql/engine_metrics_average.go +++ b/pkg/traceql/engine_metrics_average.go @@ -33,12 +33,8 @@ func newAverageOverTimeMetricsAggregator(attr Attribute, by []Attribute) *averag } func (a *averageOverTimeAggregator) init(q *tempopb.QueryRangeRequest, mode AggregateMode) { - exemplarFn := func(s Span) (float64, uint64) { - return math.NaN(), a.spanStartTimeMs(s) - } - a.seriesAgg = &averageOverTimeSeriesAggregator{ - weightedAverageSeries: make(map[string]averageSeries), + weightedAverageSeries: make(map[string]*averageSeries), len: IntervalCount(q.Start, q.End, q.Step), start: q.Start, end: q.End, @@ -50,8 +46,8 @@ func (a *averageOverTimeAggregator) init(q *tempopb.QueryRangeRequest, mode Aggr a.agg = newAvgOverTimeSpanAggregator(a.attr, a.by, q.Start, q.End, q.Step) } - a.exemplarFn = exemplarFn a.mode = mode + a.exemplarFn = exemplarFnFor(a.attr) } func (a *averageOverTimeAggregator) observe(span Span) { @@ -110,10 +106,6 @@ func (a *averageOverTimeAggregator) validate() error { return nil } -func (a *averageOverTimeAggregator) spanStartTimeMs(s Span) uint64 { - return s.StartTimeUnixNanos() / uint64(time.Millisecond) -} - func (a *averageOverTimeAggregator) String() string { s := strings.Builder{} @@ -138,7 +130,7 @@ func (a *averageOverTimeAggregator) String() string { } type averageOverTimeSeriesAggregator struct { - weightedAverageSeries map[string]averageSeries + weightedAverageSeries map[string]*averageSeries len int start, end, step uint64 exemplarBuckets *bucketSet @@ -279,7 +271,8 @@ func (b *averageOverTimeSeriesAggregator) Combine(in []*tempopb.TimeSeries) { countPosMapper[avgSeriesPromLabel] = i } else if !ok { promLabels := getLabels(ts.Labels, "") - b.weightedAverageSeries[ts.PromLabels] = newAverageSeries(b.len, len(ts.Exemplars), promLabels) + s := newAverageSeries(b.len, len(ts.Exemplars), promLabels) + b.weightedAverageSeries[ts.PromLabels] = &s } } for _, ts := range in { @@ -302,7 +295,7 @@ func (b *averageOverTimeSeriesAggregator) Combine(in []*tempopb.TimeSeries) { } } -func (b *averageOverTimeSeriesAggregator) aggregateExemplars(ts *tempopb.TimeSeries, existing averageSeries) { +func (b *averageOverTimeSeriesAggregator) aggregateExemplars(ts *tempopb.TimeSeries, existing *averageSeries) { for _, exemplar := range ts.Exemplars { if b.exemplarBuckets.testTotal() { break