Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TraceQL Metrics] Fix quantiles/histograms of generic duration attributes (i.e. traceDuration) #3879

Merged
merged 2 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
* [BUGFIX] Fix metrics queries when grouping by attributes that may not exist [#3734](https://github.com/grafana/tempo/pull/3734) (@mdisibio)
* [BUGFIX] Fix frontend parsing error on cached responses [#3759](https://github.com/grafana/tempo/pull/3759) (@mdisibio)
* [BUGFIX] Fix autocomplete of a query using scoped instrinsics [#3865](https://github.com/grafana/tempo/pull/3865) (@mdisibio)
* [BUGFIX] Fix metrics query histograms and quantiles on traceDuration [#3879](https://github.com/grafana/tempo/pull/3879) (@mdisibio)
* [BUGFIX] max_global_traces_per_user: take into account ingestion.tenant_shard_size when converting to local limit [#3618](https://github.com/grafana/tempo/pull/3618) (@kvrhdn)
* [BUGFIX] Fix http connection reuse on GCP and AWS by reading io.EOF through the http body. [#3760](https://github.com/grafana/tempo/pull/3760) (@bmteller)
* [BUGFIX] Improved handling of complete blocks in localblocks processor after enabling flusing [#3805](https://github.com/grafana/tempo/pull/3805) (@mapno)
Expand Down
113 changes: 43 additions & 70 deletions pkg/traceql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -1067,83 +1067,18 @@ func (a *MetricsAggregate) init(q *tempopb.QueryRangeRequest, mode AggregateMode
case metricsAggregateRate:
innerAgg = func() VectorAggregator { return NewRateAggregator(1.0 / time.Duration(q.Step).Seconds()) }

case metricsAggregateHistogramOverTime:
// Histograms are implemented as count_over_time() by(2^log2(attr)) for now
// This is very similar to quantile_over_time except the bucket values are the true
// underlying value in scale, i.e. a duration of 500ms will be in __bucket==0.512s
// The difference is that quantile_over_time has to calculate the final quantiles
// so in that case the log2 bucket number is more useful. We can clean it up later
// when updating quantiles to be smarter and more customizable range of buckets.
case metricsAggregateHistogramOverTime, metricsAggregateQuantileOverTime:
// Histograms and quantiles are implemented as count_over_time() by(2^log2(attr)) for now
// I.e. a duration of 500ms will be in __bucket==0.512s
innerAgg = func() VectorAggregator { return NewCountOverTimeAggregator() }
byFuncLabel = internalLabelBucket
switch a.attr {
case IntrinsicDurationAttribute:
// Optimal implementation for duration attribute
byFunc = func(s Span) (Static, bool) {
d := s.DurationNanos()
if d < 2 {
return NewStaticNil(), false
}
// Bucket is log2(nanos) converted to float seconds
return NewStaticFloat(Log2Bucketize(d) / float64(time.Second)), true
}
byFunc = a.bucketizeSpanDuration
default:
// Basic implementation for all other attributes
byFunc = func(s Span) (Static, bool) {
v, ok := s.AttributeFor(a.attr)
if !ok {
return NewStaticNil(), false
}

// TODO(mdisibio) - Add support for floats, we need to map them into buckets.
// Because of the range of floats, we need a native histogram approach.
n, ok := v.Int()
if !ok {
return NewStaticNil(), false
}

if n < 2 {
return NewStaticNil(), false
}
// Bucket is the value rounded up to the nearest power of 2
return NewStaticFloat(Log2Bucketize(uint64(n))), true
}
}

case metricsAggregateQuantileOverTime:
// Quantiles are implemented as count_over_time() by(log2(attr)) for now
innerAgg = func() VectorAggregator { return NewCountOverTimeAggregator() }
byFuncLabel = internalLabelBucket
switch a.attr {
case IntrinsicDurationAttribute:
// Optimal implementation for duration attribute
byFunc = func(s Span) (Static, bool) {
d := s.DurationNanos()
if d < 2 {
return NewStaticNil(), false
}
// Bucket is in seconds
return NewStaticFloat(Log2Bucketize(d) / float64(time.Second)), true
}
default:
// Basic implementation for all other attributes
byFunc = func(s Span) (Static, bool) {
v, ok := s.AttributeFor(a.attr)
if !ok {
return NewStaticNil(), false
}

// TODO(mdisibio) - Add support for floats, we need to map them into buckets.
// Because of the range of floats, we need a native histogram approach.
if v.Type != TypeInt {
return NewStaticNil(), false
}
n, _ := v.Int()
if n < 2 {
return NewStaticNil(), false
}
return NewStaticFloat(Log2Bucketize(uint64(n))), true
}
byFunc = a.bucketizeAttribute
}
}

Expand All @@ -1152,6 +1087,44 @@ func (a *MetricsAggregate) init(q *tempopb.QueryRangeRequest, mode AggregateMode
}, a.by, byFunc, byFuncLabel)
}

func (a *MetricsAggregate) bucketizeSpanDuration(s Span) (Static, bool) {
d := s.DurationNanos()
if d < 2 {
return NewStaticNil(), false
}
// Bucket is in seconds
return NewStaticFloat(Log2Bucketize(d) / float64(time.Second)), true
}

func (a *MetricsAggregate) bucketizeAttribute(s Span) (Static, bool) {
v, ok := s.AttributeFor(a.attr)
if !ok {
return NewStaticNil(), false
}

switch v.Type {
case TypeInt:
n, _ := v.Int()
if n < 2 {
return NewStaticNil(), false
}
// Bucket is the value rounded up to the nearest power of 2
return NewStaticFloat(Log2Bucketize(uint64(n))), true
case TypeDuration:
d, _ := v.Duration()
n := d.Nanoseconds()
if n < 2 {
return NewStaticNil(), false
}
// Bucket is log2(nanos) converted to float seconds
return NewStaticFloat(Log2Bucketize(uint64(n)) / float64(time.Second)), true
default:
// TODO(mdisibio) - Add support for floats, we need to map them into buckets.
// Because of the range of floats, we need a native histogram approach.
return NewStaticNil(), false
}
}

func (a *MetricsAggregate) initSum(q *tempopb.QueryRangeRequest) {
// Currently all metrics are summed by job to produce
// intermediate results. This will change when adding min/max/topk/etc
Expand Down
Loading