Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TraceQL] Implement structural operators #2625

Merged
merged 20 commits into from
Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* [CHANGE] Make vParquet2 the default block format [#2526](https://github.com/grafana/tempo/pull/2526) (@stoewer)
* [CHANGE] Disable tempo-query by default in Jsonnet libs. [#2462](https://github.com/grafana/tempo/pull/2462) (@electron0zero)
* [FEATURE] New experimental API to derive on-demand RED metrics grouped by any attribute, and new metrics generator processor [#2368](https://github.com/grafana/tempo/pull/2368) [#2418](https://github.com/grafana/tempo/pull/2418) [#2424](https://github.com/grafana/tempo/pull/2424) [#2442](https://github.com/grafana/tempo/pull/2442) [#2480](https://github.com/grafana/tempo/pull/2480) [#2481](https://github.com/grafana/tempo/pull/2481) [#2501](https://github.com/grafana/tempo/pull/2501) [#2579](https://github.com/grafana/tempo/pull/2579) [#2582](https://github.com/grafana/tempo/pull/2582) (@mdisibio @zalegrala)
* [FEATURE] New TraceQL structural operators descendant (>>), child (>), and sibling (~) [#2625](https://github.com/grafana/tempo/pull/2625) (@mdisibio)
* [ENHANCEMENT] Add capability to flush all remaining traces to backend when ingester is stopped [#2538](https://github.com/grafana/tempo/pull/2538)
* [ENHANCEMENT] Fill parent ID column and nested set columns [#2487](https://github.com/grafana/tempo/pull/2487) (@stoewer)
* [ENHANCEMENT] Add metrics generator config option to allow customizable ring port [#2399](https://github.com/grafana/tempo/pull/2399) (@mdisibio)
Expand Down
23 changes: 20 additions & 3 deletions docs/sources/tempo/traceql/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,14 @@ In the above example, if a span includes an `.http.method` attribute set to `DEL

## Combining spansets

Spanset operators let you combine two sets of spans using and (`&&`) as well as union (`||`).
Spanset operators let you select different sets of spans from a trace and then make a determination between them.

- `{condA} && {condB}`
- `{condA} || {condB}`
### Logical

These spanset operators perform logical checks between the sets of spans.

- `{condA} && {condB}` - The and operator (`&&`) checks that both conditions found matches.
- `{condA} || {condB}` - The union operator (`||`) checks that either condition found matches.

For example, to find a trace that went through two specific `cloud.region`:

Expand All @@ -195,6 +198,20 @@ Note the difference between the previous example and this one:

The second expression returns no traces because it's impossible for a single span to have a `resource.cloud.region` attribute that is set to both region values at the same time.

### Structural

These spanset operators look at the structure of a trace and the relationship between the spans.

- `{condA} >> {condB}` - The descendant operator (`>>`) looks for spans matching `{condB}` that are descendants of a span matching `{condA}`
- `{condA} > {condB}` - The child operator (`>`) looks for spans matching `{condB}` that are direct child spans of a parent matching `{condA}`
- `{condA} ~ {condB}` - The sibling operator (`~`) checks that spans matching `{condA}` and `{condB}` are siblings of the same parent span.

For example, to find a trace where a specific HTTP API interacted with a specific database:

```
{ span.http.url = "/path/of/api" } >> { span.db.name = "db-shard-001" }
```

## Aggregators

So far, all of the example queries expressions have been about individual spans. You can use aggregate functions to ask questions about a set of spans. These currently consist of:
Expand Down
44 changes: 31 additions & 13 deletions pkg/traceql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,7 @@ func (o GroupOperation) extractConditions(request *FetchSpansRequest) {
o.Expression.extractConditions(request)
}

type CoalesceOperation struct {
}
type CoalesceOperation struct{}

func newCoalesceOperation() CoalesceOperation {
return CoalesceOperation{}
Expand All @@ -143,7 +142,7 @@ func newSelectOperation(exprs []FieldExpression) SelectOperation {
// Scalars
// **********************
type ScalarExpression interface {
//pipelineElement
// pipelineElement
Element
typedExpression
__scalarExpression()
Expand Down Expand Up @@ -227,14 +226,31 @@ type SpansetExpression interface {
}

type SpansetOperation struct {
Op Operator
LHS SpansetExpression
RHS SpansetExpression
Op Operator
LHS SpansetExpression
RHS SpansetExpression
matchingSpansBuffer []Span
}

func (o SpansetOperation) extractConditions(request *FetchSpansRequest) {
o.LHS.extractConditions(request)
o.RHS.extractConditions(request)

switch o.Op {
case OpSpansetDescendant:
request.Conditions = append(request.Conditions, Condition{
Attribute: NewIntrinsic(IntrinsicStructuralDescendant),
})
case OpSpansetChild:
request.Conditions = append(request.Conditions, Condition{
Attribute: NewIntrinsic(IntrinsicStructuralChild),
})
case OpSpansetSibling:
request.Conditions = append(request.Conditions, Condition{
Attribute: NewIntrinsic(IntrinsicStructuralSibling),
})
}

request.AllConditions = false
}

Expand Down Expand Up @@ -688,10 +704,12 @@ func NewIntrinsic(n Intrinsic) Attribute {
}
}

var _ pipelineElement = (*Pipeline)(nil)
var _ pipelineElement = (*Aggregate)(nil)
var _ pipelineElement = (*SpansetOperation)(nil)
var _ pipelineElement = (*SpansetFilter)(nil)
var _ pipelineElement = (*CoalesceOperation)(nil)
var _ pipelineElement = (*ScalarFilter)(nil)
var _ pipelineElement = (*GroupOperation)(nil)
var (
_ pipelineElement = (*Pipeline)(nil)
_ pipelineElement = (*Aggregate)(nil)
_ pipelineElement = (*SpansetOperation)(nil)
_ pipelineElement = (*SpansetFilter)(nil)
_ pipelineElement = (*CoalesceOperation)(nil)
_ pipelineElement = (*ScalarFilter)(nil)
_ pipelineElement = (*GroupOperation)(nil)
)
94 changes: 92 additions & 2 deletions pkg/traceql/ast_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"strings"
)

var errSpansetOperationMultiple = errors.New("spanset operators are not supported for multiple spansets per trace. consider using coalesce()")

func (g GroupOperation) evaluate(ss []*Spanset) ([]*Spanset, error) {
result := make([]*Spanset, 0, len(ss))
groups := g.groupBuffer
Expand Down Expand Up @@ -70,7 +72,6 @@ func (CoalesceOperation) evaluate(ss []*Spanset) ([]*Spanset, error) {
}

func (o SpansetOperation) evaluate(input []*Spanset) (output []*Spanset, err error) {

for i := range input {
curr := input[i : i+1]

Expand Down Expand Up @@ -99,6 +100,54 @@ func (o SpansetOperation) evaluate(input []*Spanset) (output []*Spanset, err err
output = append(output, matchingSpanset)
}

case OpSpansetDescendant:
spans, err := o.joinSpansets(lhs, rhs, func(l, r Span) bool {
return r.DescendantOf(l)
})
if err != nil {
return nil, err
}

if len(spans) > 0 {
// Clone here to capture previously computed aggregates, grouped attrs, etc.
// Copy spans to new slice because of internal buffering.
matchingSpanset := input[i].clone()
matchingSpanset.Spans = append([]Span(nil), spans...)
output = append(output, matchingSpanset)
}

case OpSpansetChild:
spans, err := o.joinSpansets(lhs, rhs, func(l, r Span) bool {
return r.ChildOf(l)
})
if err != nil {
return nil, err
}

if len(spans) > 0 {
// Clone here to capture previously computed aggregates, grouped attrs, etc.
// Copy spans to new slice because of internal buffering.
matchingSpanset := input[i].clone()
matchingSpanset.Spans = append([]Span(nil), spans...)
output = append(output, matchingSpanset)
}

case OpSpansetSibling:
spans, err := o.joinSpansets(lhs, rhs, func(l, r Span) bool {
return r.SiblingOf(l)
})
if err != nil {
return nil, err
}

if len(spans) > 0 {
// Clone here to capture previously computed aggregates, grouped attrs, etc.
// Copy spans to new slice because of internal buffering.
matchingSpanset := input[i].clone()
matchingSpanset.Spans = append([]Span(nil), spans...)
output = append(output, matchingSpanset)
}

default:
return nil, fmt.Errorf("spanset operation (%v) not supported", o.Op)
}
Expand All @@ -107,13 +156,54 @@ func (o SpansetOperation) evaluate(input []*Spanset) (output []*Spanset, err err
return output, nil
}

// joinSpansets compares all pairwise combinations of the inputs and returns the right-hand side
// where the eval callback returns true. For now the behavior is only defined when there is exactly one
// spanset on both sides and will return an error if multiple spansets are present.
func (o *SpansetOperation) joinSpansets(lhs, rhs []*Spanset, eval func(l, r Span) bool) ([]Span, error) {
if len(lhs) < 1 || len(rhs) < 1 {
return nil, nil
}

if len(lhs) > 1 || len(rhs) > 1 {
return nil, errSpansetOperationMultiple
}

return o.joinSpansAndReturnRHS(lhs[0].Spans, rhs[0].Spans, eval), nil
}

// joinSpansAndReturnRHS compares all pairwise combinations of the inputs and returns the right-hand side
// spans where the eval callback returns true. Uses and internal buffer and output is only valid until
// the next call. Destructively edits the RHS slice for performance.
func (o *SpansetOperation) joinSpansAndReturnRHS(lhs, rhs []Span, eval func(l, r Span) bool) []Span {
if len(lhs) == 0 || len(rhs) == 0 {
return nil
}

o.matchingSpansBuffer = o.matchingSpansBuffer[:0]

for _, l := range lhs {
for i, r := range rhs {
if r == nil {
// Already matched
continue
}
if eval(l, r) {
// Returns RHS
o.matchingSpansBuffer = append(o.matchingSpansBuffer, r)
rhs[i] = nil // No need to check this span again
}
}
}

return o.matchingSpansBuffer
}

// SelectOperation evaluate is a no-op b/c the fetch layer has already decorated the spans with the requested attributes
func (o SelectOperation) evaluate(input []*Spanset) (output []*Spanset, err error) {
return input, nil
}

func (f ScalarFilter) evaluate(input []*Spanset) (output []*Spanset, err error) {

// TODO we solve this gap where pipeline elements and scalar binary
// operations meet in a generic way. For now we only support well-defined
// case: aggregate binop static
Expand Down
Loading