Skip to content

Commit

Permalink
[TraceQL Metrics] Add more instrumentation and a little cleanup (graf…
Browse files Browse the repository at this point in the history
…ana#3388)

* Update traceql metrics to use the trace-level timestamp columns conditionally

* comments

* Update benchmark, comment

* lint

* Change overlap cutoff to 20%

* add more instrumentation and a little cleanup

* Fix test
  • Loading branch information
mdisibio authored and Koenraad Verheyden committed Feb 26, 2024
1 parent 55cdef6 commit 2ee50b3
Show file tree
Hide file tree
Showing 9 changed files with 269 additions and 195 deletions.
5 changes: 4 additions & 1 deletion modules/frontend/query_range_sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,15 +217,18 @@ func (s queryRangeSharder) RoundTrip(r *http.Request) (*http.Response, error) {
res.Metrics.TotalBlockBytes = uint64(totalBlockBytes)

reqTime := time.Since(now)
throughput := float64(res.Metrics.InspectedBytes) / reqTime.Seconds()
throughput := math.Round(float64(res.Metrics.InspectedBytes) / reqTime.Seconds())
spanThroughput := math.Round(float64(res.Metrics.InspectedSpans) / reqTime.Seconds())

span.SetTag("totalBlocks", res.Metrics.TotalBlocks)
span.SetTag("inspectedBytes", res.Metrics.InspectedBytes)
span.SetTag("inspectedTraces", res.Metrics.InspectedTraces)
span.SetTag("inspectedSpans", res.Metrics.InspectedSpans)
span.SetTag("totalBlockBytes", res.Metrics.TotalBlockBytes)
span.SetTag("totalJobs", res.Metrics.TotalJobs)
span.SetTag("finishedJobs", res.Metrics.CompletedJobs)
span.SetTag("requestThroughput", throughput)
span.SetTag("spanThroughput", spanThroughput)

if jErr := jobErr.Load(); jErr != nil {
return s.respErrHandler(isProm, jErr)
Expand Down
5 changes: 5 additions & 0 deletions modules/querier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,11 @@ func (q *Querier) QueryRangeHandler(w http.ResponseWriter, r *http.Request) {
errHandler(ctx, span, err)
return
}

if resp != nil && resp.Metrics != nil {
span.SetTag("inspectedBytes", resp.Metrics.InspectedBytes)
span.SetTag("inspectedSpans", resp.Metrics.InspectedSpans)
}
}

func handleError(w http.ResponseWriter, err error) {
Expand Down
28 changes: 22 additions & 6 deletions modules/querier/querier_query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,6 @@ func (q *Querier) queryBackend(ctx context.Context, req *tempopb.QueryRangeReque
return nil, err
}

eval, err := traceql.NewEngine().CompileMetricsQueryRange(req, true)
if err != nil {
return nil, err
}

// Get blocks that overlap this time range
metas := q.store.BlockMetas(tenantID)
withinTimeRange := metas[:0]
Expand All @@ -78,6 +73,19 @@ func (q *Querier) queryBackend(ctx context.Context, req *tempopb.QueryRangeReque
}
}

if len(withinTimeRange) == 0 {
return nil, nil
}

// Optimization
// If there's only 1 block then dedupe not needed.
dedupe := len(withinTimeRange) > 1

eval, err := traceql.NewEngine().CompileMetricsQueryRange(req, dedupe)
if err != nil {
return nil, err
}

wg := boundedwaitgroup.New(2)
jobErr := atomic.Error{}

Expand Down Expand Up @@ -119,7 +127,15 @@ func (q *Querier) queryBackend(ctx context.Context, req *tempopb.QueryRangeReque
return nil, err
}

return &tempopb.QueryRangeResponse{Series: queryRangeTraceQLToProto(res, req)}, nil
inspectedBytes, spansTotal, _ := eval.Metrics()

return &tempopb.QueryRangeResponse{
Series: queryRangeTraceQLToProto(res, req),
Metrics: &tempopb.SearchMetrics{
InspectedBytes: inspectedBytes,
InspectedSpans: spansTotal,
},
}, nil
}

func queryRangeTraceQLToProto(set traceql.SeriesSet, req *tempopb.QueryRangeRequest) []*tempopb.TimeSeries {
Expand Down
348 changes: 192 additions & 156 deletions pkg/tempopb/tempo.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pkg/tempopb/tempo.proto
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ message SearchMetrics {
uint32 completedJobs = 4;
uint32 totalJobs = 5;
uint64 totalBlockBytes = 6;
uint64 inspectedSpans = 7;
}

message SearchTagsRequest {
Expand Down
2 changes: 2 additions & 0 deletions pkg/traceql/combine.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ func (q *QueryRangeCombiner) Combine(resp *tempopb.QueryRangeResponse) {
q.metrics.TotalBlocks += resp.Metrics.TotalBlocks
q.metrics.TotalBlockBytes += resp.Metrics.TotalBlockBytes
q.metrics.InspectedBytes += resp.Metrics.InspectedBytes
q.metrics.InspectedTraces += resp.Metrics.InspectedTraces
q.metrics.InspectedSpans += resp.Metrics.InspectedSpans
}
}

Expand Down
44 changes: 26 additions & 18 deletions pkg/traceql/engine_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,32 +365,32 @@ func (e *Engine) CompileMetricsQueryRange(req *tempopb.QueryRangeRequest, dedupe
dedupeSpans: dedupeSpans,
}

// TraceID (not always required)
if req.ShardCount > 1 || dedupeSpans {
// TraceID (optional)
if req.ShardCount > 1 {
// For sharding it must be in the first pass so that we only evalulate our traces.
storageReq.ShardID = req.ShardID
storageReq.ShardCount = req.ShardCount
traceID := NewIntrinsic(IntrinsicTraceID)
if !storageReq.HasAttribute(traceID) {
storageReq.Conditions = append(storageReq.Conditions, Condition{Attribute: traceID})
if !storageReq.HasAttribute(IntrinsicTraceIDAttribute) {
storageReq.Conditions = append(storageReq.Conditions, Condition{Attribute: IntrinsicTraceIDAttribute})
}
}

// Span start time (always required)
startTime := NewIntrinsic(IntrinsicSpanStartTime)
if !storageReq.HasAttribute(startTime) {
if storageReq.AllConditions {
// The most efficient case. We can add it to the primary pass
// without affecting correctness. And this lets us avoid the
// entire second pass.
storageReq.Conditions = append(storageReq.Conditions, Condition{Attribute: startTime})
} else {
// Complex query with a second pass. In this case it is better to
// add it to the second pass so that it's only returned for the matches.
storageReq.SecondPassConditions = append(storageReq.SecondPassConditions, Condition{Attribute: startTime})
if dedupeSpans {
// For dedupe we only need the trace ID on matching spans, so it can go in the second pass.
// This is a no-op if we are already sharding and it's in the first pass.
// Finally, this is often optimized back to the first pass when it lets us avoid a second pass altogether.
if !storageReq.HasAttribute(IntrinsicTraceIDAttribute) {
storageReq.SecondPassConditions = append(storageReq.SecondPassConditions, Condition{Attribute: IntrinsicTraceIDAttribute})
}
}

// Span start time (always required)
if !storageReq.HasAttribute(IntrinsicSpanStartTimeAttribute) {
// Technically we only need the start time of matching spans, so we add it to the second pass.
// However this is often optimized back to the first pass when it lets us avoid a second pass altogether.
storageReq.SecondPassConditions = append(storageReq.SecondPassConditions, Condition{Attribute: IntrinsicSpanStartTimeAttribute})
}

// Timestamp filtering
// (1) Include any overlapping trace
// It can be faster to skip the trace-level timestamp check
Expand All @@ -399,6 +399,11 @@ func (e *Engine) CompileMetricsQueryRange(req *tempopb.QueryRangeRequest, dedupe
// (2) Only include spans that started in this time frame.
// This is checked outside the fetch layer in the evaluator. Timestamp
// is only checked on the spans that are the final results.
// TODO - I think there are cases where we can push this down.
// Queries like {status=error} | rate() don't assert inter-span conditions
// and we could filter on span start time without affecting correctness.
// Queries where we can't are like: {A} >> {B} | rate() because only require
// that {B} occurs within our time range but {A} is allowed to occur any time.
me.checkTime = true
me.start = req.Start
me.end = req.End
Expand All @@ -422,7 +427,10 @@ func (e *Engine) CompileMetricsQueryRange(req *tempopb.QueryRangeRequest, dedupe

// optimize numerous things within the request that is specific to metrics.
func optimize(req *FetchSpansRequest) {
// Special optimization for queries like {} | rate() by (rootName)
// Special optimization for queries like:
// {} | rate()
// {} | rate() by (rootName)
// {} | rate() by (resource.service.name)
// When the second pass consists only of intrinsics, then it's possible to
// move them to the first pass and increase performance. It avoids the second pass/bridge
// layer and doesn't alter the correctness of the query.
Expand Down
29 changes: 15 additions & 14 deletions pkg/traceql/engine_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func TestCompileMetricsQueryRangeFetchSpansRequest(t *testing.T) {
Conditions: []Condition{
{
// In this case start time is in the first pass
Attribute: NewIntrinsic(IntrinsicSpanStartTime),
Attribute: IntrinsicSpanStartTimeAttribute,
},
},
},
Expand All @@ -201,10 +201,10 @@ func TestCompileMetricsQueryRangeFetchSpansRequest(t *testing.T) {
AllConditions: true,
Conditions: []Condition{
{
Attribute: NewIntrinsic(IntrinsicSpanStartTime),
Attribute: IntrinsicSpanStartTimeAttribute,
},
{
Attribute: NewIntrinsic(IntrinsicTraceID), // Required for dedupe
Attribute: IntrinsicTraceIDAttribute, // Required for dedupe
},
},
},
Expand All @@ -219,22 +219,23 @@ func TestCompileMetricsQueryRangeFetchSpansRequest(t *testing.T) {
ShardCount: 456,
Conditions: []Condition{
{
Attribute: NewIntrinsic(IntrinsicDuration),
Attribute: IntrinsicDurationAttribute,
Op: OpGreater,
Operands: Operands{NewStaticDuration(10 * time.Second)},
},
{
Attribute: NewIntrinsic(IntrinsicTraceID), // Required for sharding
},
{
Attribute: NewIntrinsic(IntrinsicSpanStartTime),
Attribute: IntrinsicTraceIDAttribute, // Required for sharding
},
},
SecondPassConditions: []Condition{
{
// Group-by attributes (non-intrinsic) must be in the second pass
Attribute: NewScopedAttribute(AttributeScopeResource, false, "cluster"),
},
{
// Since there is already a second pass then span start time isn't optimized to the first pass.
Attribute: IntrinsicSpanStartTimeAttribute,
},
},
},
},
Expand All @@ -248,24 +249,24 @@ func TestCompileMetricsQueryRangeFetchSpansRequest(t *testing.T) {
ShardCount: 456,
Conditions: []Condition{
{
Attribute: NewIntrinsic(IntrinsicDuration),
Attribute: IntrinsicDurationAttribute,
Op: OpGreater,
Operands: Operands{NewStaticDuration(10 * time.Second)},
},
{
Attribute: NewIntrinsic(IntrinsicTraceID), // Required for sharding
},
{
Attribute: NewIntrinsic(IntrinsicSpanStartTime),
Attribute: IntrinsicTraceIDAttribute, // Required for sharding
},
{
// Intrinsic moved to first pass
Attribute: NewIntrinsic(IntrinsicName),
Attribute: IntrinsicNameAttribute,
},
{
// Resource service name is treated as an intrinsic and moved to the first pass
Attribute: NewScopedAttribute(AttributeScopeResource, false, "service.name"),
},
{
Attribute: IntrinsicSpanStartTimeAttribute,
},
},
},
},
Expand Down
2 changes: 2 additions & 0 deletions pkg/traceql/enum_attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,11 @@ var (
IntrinsicStatusMessageAttribute = NewIntrinsic(IntrinsicStatusMessage)
IntrinsicKindAttribute = NewIntrinsic(IntrinsicKind)
IntrinsicChildCountAttribute = NewIntrinsic(IntrinsicChildCount)
IntrinsicTraceIDAttribute = NewIntrinsic(IntrinsicTraceID)
IntrinsicTraceRootServiceAttribute = NewIntrinsic(IntrinsicTraceRootService)
IntrinsicTraceRootSpanAttribute = NewIntrinsic(IntrinsicTraceRootSpan)
IntrinsicTraceDurationAttribute = NewIntrinsic(IntrinsicTraceDuration)
IntrinsicSpanStartTimeAttribute = NewIntrinsic(IntrinsicSpanStartTime)
)

func (i Intrinsic) String() string {
Expand Down

0 comments on commit 2ee50b3

Please sign in to comment.