Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TraceQL] Implement structural operators #2625

Merged
merged 20 commits into from
Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 27 additions & 10 deletions pkg/traceql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,7 @@ func (o GroupOperation) extractConditions(request *FetchSpansRequest) {
o.Expression.extractConditions(request)
}

type CoalesceOperation struct {
}
type CoalesceOperation struct{}

func newCoalesceOperation() CoalesceOperation {
return CoalesceOperation{}
Expand All @@ -143,7 +142,7 @@ func newSelectOperation(exprs []FieldExpression) SelectOperation {
// Scalars
// **********************
type ScalarExpression interface {
//pipelineElement
// pipelineElement
Element
typedExpression
__scalarExpression()
Expand Down Expand Up @@ -235,6 +234,22 @@ type SpansetOperation struct {
func (o SpansetOperation) extractConditions(request *FetchSpansRequest) {
o.LHS.extractConditions(request)
o.RHS.extractConditions(request)

switch o.Op {
case OpSpansetDescendant:
request.Conditions = append(request.Conditions, Condition{
Attribute: NewIntrinsic(IntrinsicStructuralDescendant),
})
case OpSpansetChild:
request.Conditions = append(request.Conditions, Condition{
Attribute: NewIntrinsic(IntrinsicStructuralChild),
})
case OpSpansetSibling:
request.Conditions = append(request.Conditions, Condition{
Attribute: NewIntrinsic(IntrinsicStructuralSibling),
})
}

request.AllConditions = false
}

Expand Down Expand Up @@ -688,10 +703,12 @@ func NewIntrinsic(n Intrinsic) Attribute {
}
}

var _ pipelineElement = (*Pipeline)(nil)
var _ pipelineElement = (*Aggregate)(nil)
var _ pipelineElement = (*SpansetOperation)(nil)
var _ pipelineElement = (*SpansetFilter)(nil)
var _ pipelineElement = (*CoalesceOperation)(nil)
var _ pipelineElement = (*ScalarFilter)(nil)
var _ pipelineElement = (*GroupOperation)(nil)
var (
_ pipelineElement = (*Pipeline)(nil)
_ pipelineElement = (*Aggregate)(nil)
_ pipelineElement = (*SpansetOperation)(nil)
_ pipelineElement = (*SpansetFilter)(nil)
_ pipelineElement = (*CoalesceOperation)(nil)
_ pipelineElement = (*ScalarFilter)(nil)
_ pipelineElement = (*GroupOperation)(nil)
)
68 changes: 66 additions & 2 deletions pkg/traceql/ast_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ func (CoalesceOperation) evaluate(ss []*Spanset) ([]*Spanset, error) {
}

func (o SpansetOperation) evaluate(input []*Spanset) (output []*Spanset, err error) {

for i := range input {
curr := input[i : i+1]

Expand Down Expand Up @@ -99,6 +98,72 @@ func (o SpansetOperation) evaluate(input []*Spanset) (output []*Spanset, err err
output = append(output, matchingSpanset)
}

case OpSpansetDescendant:
if len(lhs) > 0 && len(rhs) > 0 {
// Clone here to capture previously computed aggregates, grouped attrs, etc.
matchingSpanset := input[i].clone()
matchingSpanset.Spans = nil
// TODO: In what situations do lhs and rhs have more than one spanset,
// and what should the result be?
for _, l := range lhs[0].Spans {
for i, r := range rhs[0].Spans {
if r == nil {
continue
}
if r.DescendantOf(l) {
// Returns RHS
matchingSpanset.Spans = append(matchingSpanset.Spans, r)
rhs[0].Spans[i] = nil // No need to check this span again
}
}
}
output = append(output, matchingSpanset)
}

case OpSpansetChild:
if len(lhs) > 0 && len(rhs) > 0 {
// Clone here to capture previously computed aggregates, grouped attrs, etc.
matchingSpanset := input[i].clone()
matchingSpanset.Spans = nil
// TODO: In what situations do lhs and rhs have more than one spanset,
// and what should the result be?
for _, l := range lhs[0].Spans {
for i, r := range rhs[0].Spans {
if r == nil {
continue
}
if r.ChildOf(l) {
// Returns RHS
matchingSpanset.Spans = append(matchingSpanset.Spans, r)
rhs[0].Spans[i] = nil // No need to check this span again
}
}
}
output = append(output, matchingSpanset)
}

case OpSpansetSibling:
if len(lhs) > 0 && len(rhs) > 0 {
// Clone here to capture previously computed aggregates, grouped attrs, etc.
matchingSpanset := input[i].clone()
matchingSpanset.Spans = nil
// TODO: In what situations do lhs and rhs have more than one spanset,
// and what should the result be?
for _, l := range lhs[0].Spans {
for i, r := range rhs[0].Spans {
if r == nil {
continue
}
if r.SiblingOf(l) {
// Returns RHS
matchingSpanset.Spans = append(matchingSpanset.Spans, r)
rhs[0].Spans[i] = nil // No need to check this span again
}
}
}
output = append(output, matchingSpanset)
}

default:
return nil, fmt.Errorf("spanset operation (%v) not supported", o.Op)
}
Expand All @@ -113,7 +178,6 @@ func (o SelectOperation) evaluate(input []*Spanset) (output []*Spanset, err erro
}

func (f ScalarFilter) evaluate(input []*Spanset) (output []*Spanset, err error) {

// TODO we solve this gap where pipeline elements and scalar binary
// operations meet in a generic way. For now we only support well-defined
// case: aggregate binop static
Expand Down
37 changes: 34 additions & 3 deletions pkg/traceql/ast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ func TestPipelineEvaluate(t *testing.T) {
},
[]*Spanset{
{Spans: []Span{
&mockSpan{attributes: map[Attribute]Static{NewAttribute("foo1"): NewStaticString("a"), NewAttribute("foo2"): NewStaticString("b")}}}},
&mockSpan{attributes: map[Attribute]Static{NewAttribute("foo1"): NewStaticString("a"), NewAttribute("foo2"): NewStaticString("b")}},
}},
},
},
}
Expand Down Expand Up @@ -183,7 +184,8 @@ func TestSpansetFilterEvaluate(t *testing.T) {
},
[]*Spanset{
{Spans: []Span{
&mockSpan{attributes: map[Attribute]Static{NewAttribute("foo"): NewStaticString("a")}}}},
&mockSpan{attributes: map[Attribute]Static{NewAttribute("foo"): NewStaticString("a")}},
}},
},
},
{
Expand Down Expand Up @@ -274,7 +276,8 @@ func TestSpansetFilterEvaluate(t *testing.T) {
},
[]*Spanset{
{Spans: []Span{
&mockSpan{attributes: map[Attribute]Static{NewAttribute("foo"): NewStaticInt(1)}}}},
&mockSpan{attributes: map[Attribute]Static{NewAttribute("foo"): NewStaticInt(1)}},
}},
{Spans: []Span{
&mockSpan{attributes: map[Attribute]Static{NewAttribute("foo"): NewStaticInt(4)}},
&mockSpan{attributes: map[Attribute]Static{NewAttribute("foo"): NewStaticInt(5)}},
Expand Down Expand Up @@ -345,17 +348,45 @@ type mockSpan struct {
startTimeUnixNanos uint64
durationNanos uint64
attributes map[Attribute]Static

nestedsetid, left, right int
}

func (m *mockSpan) WithNestedSetInfo(id, left, right int) *mockSpan {
m.nestedsetid = id
m.left = left
m.right = right
return m
}

func (m *mockSpan) Attributes() map[Attribute]Static {
return m.attributes
}

func (m *mockSpan) ID() []byte {
return m.id
}

func (m *mockSpan) StartTimeUnixNanos() uint64 {
return m.startTimeUnixNanos
}

func (m *mockSpan) DurationNanos() uint64 {
return m.durationNanos
}

func (m *mockSpan) DescendantOf(s Span) bool {
if ss, ok := s.(*mockSpan); ok {
return m.nestedsetid > ss.left && m.nestedsetid < ss.right
}

return false
}

func (m *mockSpan) SiblingOf(Span) bool {
return false
}

func (m *mockSpan) ChildOf(Span) bool {
return false
}
11 changes: 1 addition & 10 deletions pkg/traceql/ast_validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,20 +102,12 @@ func (a Aggregate) validate() error {
}

func (o SpansetOperation) validate() error {
// TODO validate operator is a SpanSetOperator
if err := o.LHS.validate(); err != nil {
return err
}
if err := o.RHS.validate(); err != nil {
return err
}

// supported spanset operations
switch o.Op {
case OpSpansetChild, OpSpansetDescendant, OpSpansetSibling:
return newUnsupportedError(fmt.Sprintf("spanset operation (%v)", o.Op))
}

return nil
}

Expand Down Expand Up @@ -225,8 +217,7 @@ func (a Attribute) validate() error {
return newUnsupportedError("parent")
}
switch a.Intrinsic {
case IntrinsicParent,
IntrinsicChildCount:
case IntrinsicParent, IntrinsicChildCount:
return newUnsupportedError(fmt.Sprintf("intrinsic (%v)", a.Intrinsic))
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/traceql/enum_attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ const (
// not yet implemented in traceql but will be
IntrinsicParent

IntrinsicStructuralDescendant
IntrinsicStructuralSibling
IntrinsicStructuralChild

// not yet implemented in traceql and may never be. these exist so that we can retrieve
// these fields from the fetch layer
IntrinsicTraceID
Expand Down
4 changes: 4 additions & 0 deletions pkg/traceql/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ type Span interface {
ID() []byte
StartTimeUnixNanos() uint64
DurationNanos() uint64

SiblingOf(Span) bool
DescendantOf(Span) bool
ChildOf(Span) bool
}

// should we just make matched a field on the spanset instead of a special attribute?
Expand Down
13 changes: 6 additions & 7 deletions pkg/traceql/test_examples.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ valid:
# spanset expressions
- '{ true } && { true }'
- '{ true } || { true }'
- '{ true } >> { true }'
- '{ true } > { true }'
- '{ true } ~ { true }'
- '({ true } | count() > 1 | { false }) >> ({ true } | count() > 1 | { false })'
- '({ true } | count() > 1 | { false }) > ({ true } | count() > 1 | { false })'
- '({ true } | count() > 1 | { false }) ~ ({ true } | count() > 1 | { false })'
# scalar filters
- 'avg(.field) > 1'
- 'max(duration) >= 1s'
Expand Down Expand Up @@ -249,13 +255,6 @@ unsupported:
- '{ 1 = childCount }'
# childCount - will be invalid when supported
- '{ "foo" = childCount }'
# spanset operations - will be valid when supported
- '{ true } >> { true }'
- '{ true } > { true }'
- '{ true } ~ { true }'
- '({ true } | count() > 1 | { false }) >> ({ true } | count() > 1 | { false })'
- '({ true } | count() > 1 | { false }) > ({ true } | count() > 1 | { false })'
- '({ true } | count() > 1 | { false }) ~ ({ true } | count() > 1 | { false })'
# spanset pipelines + scalar filters - will be valid when supported
- '{ true } | count() + count() = 1'
- '({ true } | count()) + ({ true } | count()) = 1'
Expand Down
10 changes: 7 additions & 3 deletions pkg/traceqlmetrics/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ func (m *mockSpan) WithDuration(d uint64) *mockSpan {
}

func (m *mockSpan) WithAttributes(nameValuePairs ...string) *mockSpan {

for i := 0; i < len(nameValuePairs); i += 2 {
attr := traceql.MustParseIdentifier(nameValuePairs[i])
value := traceql.NewStaticString(nameValuePairs[i+1])
Expand All @@ -50,14 +49,19 @@ func (m *mockSpan) Attributes() map[traceql.Attribute]traceql.Static { return m.
func (m *mockSpan) ID() []byte { return nil }
func (m *mockSpan) StartTimeUnixNanos() uint64 { return m.start }
func (m *mockSpan) DurationNanos() uint64 { return m.duration }
func (m *mockSpan) DescendantOf(s traceql.Span) bool { return false }
func (m *mockSpan) SiblingOf(traceql.Span) bool { return false }
func (m *mockSpan) ChildOf(traceql.Span) bool { return false }

type mockFetcher struct {
filter traceql.SecondPassFn
Spansets []*traceql.Spanset
}

var _ traceql.SpansetFetcher = (*mockFetcher)(nil)
var _ traceql.SpansetIterator = (*mockFetcher)(nil)
var (
_ traceql.SpansetFetcher = (*mockFetcher)(nil)
_ traceql.SpansetIterator = (*mockFetcher)(nil)
)

func (m *mockFetcher) Fetch(_ context.Context, req traceql.FetchSpansRequest) (traceql.FetchSpansResponse, error) {
m.filter = req.SecondPass
Expand Down
Loading