From 6a6a7d1e8a5750502c853e619c0048dab4082225 Mon Sep 17 00:00:00 2001 From: "A. Stoewer" Date: Tue, 16 Jul 2024 11:24:58 +1000 Subject: [PATCH] Refactoring: implement arrays for traceql.Static with reused fields (#3827) * Refactore traceql.Static implementation with reused struct fields The new implementation is meant to support static values representing []int, []float, []bool, and []string in the future without increasing the struct size * Fix common errors with new traceql.Static in traceql pkg * Use traceql.StaticMapKey in maps instead of traceql.Static * Fix bug with .foo!=nil * Static value access methods return bolean ok value, not error * Use traceql.StaticMapKey as map key in traceqlmetrics * Adjust use of traceql.Static in vp2 * Adjust use of traceql.Static in vp3 * Add method Static.StrictEquals(o) * Adjust use of traceql.Static in vp4 * Use traceql.StaticMapKey as map key in tempodb * Fix linter warnings * Add test for Static.MapKey() * Add support for string arrays to traceql.Static * Add support for float and bool arrays to traceql.Static * Create static nil values by calling traceql.NewStaticNil() * Remove TODOs and panics * Replace more Static{} with traceql.NewStaticNil() * Improve string representation for arrays * CHANGELOG.md * Remove redundant type check * Use fnv1a instead of crc32 to calculate StaticMapKey hash --------- Co-authored-by: Mario --- CHANGELOG.md | 1 + .../processor/localblocks/processor.go | 53 +- modules/querier/querier.go | 55 +- pkg/traceql/ast.go | 501 +++++++++----- pkg/traceql/ast_conditions.go | 6 +- pkg/traceql/ast_execute.go | 168 +++-- pkg/traceql/ast_stringer.go | 71 +- pkg/traceql/ast_test.go | 627 ++++++++++++++++-- pkg/traceql/ast_validate.go | 6 +- pkg/traceql/engine.go | 43 +- pkg/traceql/engine_metrics.go | 111 ++-- pkg/traceql/engine_metrics_compare.go | 81 ++- pkg/traceql/enum_hints.go | 14 +- pkg/traceql/enum_statics.go | 4 + pkg/traceql/util.go | 2 +- pkg/traceqlmetrics/metrics.go | 60 +- pkg/traceqlmetrics/metrics_test.go | 47 +- tempodb/encoding/vparquet2/block_traceql.go | 71 +- .../encoding/vparquet3/block_autocomplete.go | 27 +- tempodb/encoding/vparquet3/block_traceql.go | 81 ++- .../encoding/vparquet3/coalesce_conditions.go | 2 +- .../encoding/vparquet4/block_autocomplete.go | 27 +- tempodb/encoding/vparquet4/block_traceql.go | 85 ++- .../encoding/vparquet4/coalesce_conditions.go | 2 +- tempodb/tempodb_search_test.go | 11 +- 25 files changed, 1531 insertions(+), 625 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3ae84741500..80ad91e00fa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ * [FEATURE] Flush and query RF1 blocks for TraceQL metric queries [#3628](https://github.com/grafana/tempo/pull/3628) [#3691](https://github.com/grafana/tempo/pull/3691) [#3723](https://github.com/grafana/tempo/pull/3723) (@mapno) * [FEATURE] Add new compare() metrics function [#3695](https://github.com/grafana/tempo/pull/3695) (@mdisibio) * [FEATURE] Add a `q` parameter to `/api/v2/serach/tags` for tag name filtering [#3822](https://github.com/grafana/tempo/pull/3822) (@joe-elliott) +* [ENHANCEMENT] Implement arrays for traceql.Static with reused fields [#3827](https://github.com/grafana/tempo/pull/3827) (@stoewer) * [ENHANCEMENT] Tag value lookup use protobuf internally for improved latency [#3731](https://github.com/grafana/tempo/pull/3731) (@mdisibio) * [ENHANCEMENT] TraceQL metrics queries use protobuf internally for improved latency [#3745](https://github.com/grafana/tempo/pull/3745) (@mdisibio) * [ENHANCEMENT] Add local disk caching of metrics queries in local-blocks processor [#3799](https://github.com/grafana/tempo/pull/3799) (@mdisibio) diff --git a/modules/generator/processor/localblocks/processor.go b/modules/generator/processor/localblocks/processor.go index 8aa6079f532..68f8396cc0f 100644 --- a/modules/generator/processor/localblocks/processor.go +++ b/modules/generator/processor/localblocks/processor.go @@ -466,10 +466,10 @@ func (p *Processor) GetMetrics(ctx context.Context, req *tempopb.SpanMetricsRequ var rawHistorgram *tempopb.RawHistogram var errCount int - for series, hist := range m.Series { + for keys, sh := range m.Series { h := []*tempopb.RawHistogram{} - for bucket, count := range hist.Buckets() { + for bucket, count := range sh.Histogram.Buckets() { if count != 0 { rawHistorgram = &tempopb.RawHistogram{ Bucket: uint64(bucket), @@ -481,13 +481,13 @@ func (p *Processor) GetMetrics(ctx context.Context, req *tempopb.SpanMetricsRequ } errCount = 0 - if errs, ok := m.Errors[series]; ok { + if errs, ok := m.Errors[keys]; ok { errCount = errs } resp.Metrics = append(resp.Metrics, &tempopb.SpanMetrics{ LatencyHistogram: h, - Series: metricSeriesToProto(series), + Series: metricSeriesToProto(sh.Series), Errors: uint64(errCount), }) } @@ -802,25 +802,42 @@ func metricSeriesToProto(series traceqlmetrics.MetricSeries) []*tempopb.KeyValue var r []*tempopb.KeyValue for _, kv := range series { if kv.Key != "" { - static := kv.Value - r = append(r, &tempopb.KeyValue{ - Key: kv.Key, - Value: &tempopb.TraceQLStatic{ - Type: int32(static.Type), - N: int64(static.N), - F: static.F, - S: static.S, - B: static.B, - D: uint64(static.D), - Status: int32(static.Status), - Kind: int32(static.Kind), - }, - }) + r = append(r, traceQLStaticToProto(&kv)) } } return r } +func traceQLStaticToProto(kv *traceqlmetrics.KeyValue) *tempopb.KeyValue { + val := tempopb.TraceQLStatic{Type: int32(kv.Value.Type)} + + switch kv.Value.Type { + case traceql.TypeInt: + n, _ := kv.Value.Int() + val.N = int64(n) + case traceql.TypeFloat: + val.F = kv.Value.Float() + case traceql.TypeString: + val.S = kv.Value.EncodeToString(false) + case traceql.TypeBoolean: + b, _ := kv.Value.Bool() + val.B = b + case traceql.TypeDuration: + d, _ := kv.Value.Duration() + val.D = uint64(d) + case traceql.TypeStatus: + st, _ := kv.Value.Status() + val.Status = int32(st) + case traceql.TypeKind: + k, _ := kv.Value.Kind() + val.Kind = int32(k) + default: + val = tempopb.TraceQLStatic{Type: int32(traceql.TypeNil)} + } + + return &tempopb.KeyValue{Key: kv.Key, Value: &val} +} + // filterBatches to only root spans or kind==server. Does not modify the input // but returns a new struct referencing the same input pointers. Returns nil // if there were no matching spans. diff --git a/modules/querier/querier.go b/modules/querier/querier.go index 9b232dca48b..702f0d003c0 100644 --- a/modules/querier/querier.go +++ b/modules/querier/querier.go @@ -710,35 +710,36 @@ func (q *Querier) SpanMetricsSummary( } // Combine the results - yyy := make(map[traceqlmetrics.MetricSeries]*traceqlmetrics.LatencyHistogram) - xxx := make(map[traceqlmetrics.MetricSeries]*tempopb.SpanMetricsSummary) + yyy := make(map[traceqlmetrics.MetricKeys]*traceqlmetrics.LatencyHistogram) + xxx := make(map[traceqlmetrics.MetricKeys]*tempopb.SpanMetricsSummary) var h *traceqlmetrics.LatencyHistogram var s traceqlmetrics.MetricSeries for _, r := range results { for _, m := range r.Metrics { s = protoToMetricSeries(m.Series) + k := s.MetricKeys() - if _, ok := xxx[s]; !ok { - xxx[s] = &tempopb.SpanMetricsSummary{Series: m.Series} + if _, ok := xxx[k]; !ok { + xxx[k] = &tempopb.SpanMetricsSummary{Series: m.Series} } - xxx[s].ErrorSpanCount += m.Errors + xxx[k].ErrorSpanCount += m.Errors var b [64]int for _, l := range m.GetLatencyHistogram() { // Reconstitude the bucket b[l.Bucket] += int(l.Count) // Add to the total - xxx[s].SpanCount += l.Count + xxx[k].SpanCount += l.Count } // Combine the histogram h = traceqlmetrics.New(b) - if _, ok := yyy[s]; !ok { - yyy[s] = h + if _, ok := yyy[k]; !ok { + yyy[k] = h } else { - yyy[s].Combine(*h) + yyy[k].Combine(*h) } } } @@ -1095,18 +1096,30 @@ func protoToMetricSeries(proto []*tempopb.KeyValue) traceqlmetrics.MetricSeries return r } -func protoToTraceQLStatic(proto *tempopb.KeyValue) traceqlmetrics.KeyValue { +func protoToTraceQLStatic(kv *tempopb.KeyValue) traceqlmetrics.KeyValue { + var val traceql.Static + + switch traceql.StaticType(kv.Value.Type) { + case traceql.TypeInt: + val = traceql.NewStaticInt(int(kv.Value.N)) + case traceql.TypeFloat: + val = traceql.NewStaticFloat(kv.Value.F) + case traceql.TypeString: + val = traceql.NewStaticString(kv.Value.S) + case traceql.TypeBoolean: + val = traceql.NewStaticBool(kv.Value.B) + case traceql.TypeDuration: + val = traceql.NewStaticDuration(time.Duration(kv.Value.D)) + case traceql.TypeStatus: + val = traceql.NewStaticStatus(traceql.Status(kv.Value.Status)) + case traceql.TypeKind: + val = traceql.NewStaticKind(traceql.Kind(kv.Value.Kind)) + default: + val = traceql.NewStaticNil() + } + return traceqlmetrics.KeyValue{ - Key: proto.Key, - Value: traceql.Static{ - Type: traceql.StaticType(proto.Value.Type), - N: int(proto.Value.N), - F: proto.Value.F, - S: proto.Value.S, - B: proto.Value.B, - D: time.Duration(proto.Value.D), - Status: traceql.Status(proto.Value.Status), - Kind: traceql.Kind(proto.Value.Kind), - }, + Key: kv.Key, + Value: val, } } diff --git a/pkg/traceql/ast.go b/pkg/traceql/ast.go index aeb528c976a..414c429cf51 100644 --- a/pkg/traceql/ast.go +++ b/pkg/traceql/ast.go @@ -1,10 +1,15 @@ package traceql import ( + "bytes" + "cmp" "fmt" + "hash/fnv" "math" "regexp" + "slices" "time" + "unsafe" "github.com/grafana/tempo/pkg/tempopb" ) @@ -136,13 +141,13 @@ func (p Pipeline) evaluate(input []*Spanset) (result []*Spanset, err error) { type GroupOperation struct { Expression FieldExpression - groupBuffer map[Static]*Spanset + groupBuffer map[StaticMapKey]*Spanset } func newGroupOperation(e FieldExpression) GroupOperation { return GroupOperation{ Expression: e, - groupBuffer: make(map[Static]*Spanset), + groupBuffer: make(map[StaticMapKey]*Spanset), } } @@ -326,11 +331,7 @@ func (f *SpansetFilter) evaluate(input []*Spanset) ([]*Spanset, error) { return nil, err } - if result.Type != TypeBoolean { - continue - } - - if !result.B { + if b, ok := result.Bool(); !ok || !b { continue } @@ -475,204 +476,407 @@ func (o UnaryOperation) referencesSpan() bool { // ********************** // Statics // ********************** + type Static struct { - Type StaticType - N int - F float64 - S string - B bool - D time.Duration - Status Status // todo: can we just use the N member for status and kind? - Kind Kind + Type StaticType + + valScalar uint64 // used for int, float64, bool, time.Duration, Kind, and Status + valBytes []byte // used for string, []int, []float64, []bool + valStrings []string // used for []string } -// nolint: revive -func (Static) __fieldExpression() {} +type StaticMapKey struct { + typ StaticType + code uint64 + str string +} -// nolint: revive -func (Static) __scalarExpression() {} +func NewStaticNil() Static { + return Static{Type: TypeNil} +} -func (Static) referencesSpan() bool { - return false +func NewStaticInt(i int) Static { + return Static{ + Type: TypeInt, + valScalar: uint64(i), + } } -func (s Static) impliedType() StaticType { - return s.Type +func NewStaticFloat(f float64) Static { + return Static{ + Type: TypeFloat, + valScalar: math.Float64bits(f), + } } -func (s Static) Equals(other Static) bool { - // if they are different number types. compare them as floats. however, if they are the same type just fall through to - // a normal comparison which should be more efficient - differentNumberTypes := (s.Type == TypeInt || s.Type == TypeFloat || s.Type == TypeDuration) && - (other.Type == TypeInt || other.Type == TypeFloat || other.Type == TypeDuration) && - s.Type != other.Type - if differentNumberTypes { - return s.asFloat() == other.asFloat() +func NewStaticString(s string) Static { + return Static{ + Type: TypeString, + valBytes: unsafe.Slice(unsafe.StringData(s), len(s)), } +} - eitherIsTypeStatus := (s.Type == TypeStatus && other.Type == TypeInt) || (other.Type == TypeStatus && s.Type == TypeInt) - if eitherIsTypeStatus { - if s.Type == TypeStatus { - return s.Status == Status(other.N) - } - return Status(s.N) == other.Status +func NewStaticBool(b bool) Static { + var val uint64 + if b { + val = 1 + } + return Static{ + Type: TypeBoolean, + valScalar: val, } +} - // no special cases, just compare directly - return s == other +func NewStaticDuration(d time.Duration) Static { + return Static{ + Type: TypeDuration, + valScalar: uint64(d), + } } -func (s Static) compare(other *Static) int { - if s.Type != other.Type { - if s.asFloat() > other.asFloat() { - return 1 - } else if s.asFloat() < other.asFloat() { - return -1 - } +func NewStaticStatus(s Status) Static { + return Static{ + Type: TypeStatus, + valScalar: uint64(s), + } +} - return 0 +func NewStaticKind(k Kind) Static { + return Static{ + Type: TypeKind, + valScalar: uint64(k), } +} +func NewStaticIntArray(i []int) Static { + if i == nil { + return Static{Type: TypeIntArray} + } + if len(i) == 0 { + return Static{Type: TypeIntArray, valBytes: []byte{}} + } + + numBytes := uintptr(len(i)) * unsafe.Sizeof(i[0]) + return Static{ + Type: TypeIntArray, + valBytes: unsafe.Slice((*byte)(unsafe.Pointer(&i[0])), numBytes), + } +} + +func NewStaticFloatArray(f []float64) Static { + if f == nil { + return Static{Type: TypeFloatArray} + } + if len(f) == 0 { + return Static{Type: TypeFloatArray, valBytes: []byte{}} + } + + numBytes := uintptr(len(f)) * unsafe.Sizeof(f[0]) + return Static{ + Type: TypeFloatArray, + valBytes: unsafe.Slice((*byte)(unsafe.Pointer(&f[0])), numBytes), + } +} + +func NewStaticStringArray(s []string) Static { + if s == nil { + return Static{Type: TypeStringArray} + } + if len(s) == 0 { + return Static{Type: TypeStringArray, valStrings: []string{}} + } + + return Static{ + Type: TypeStringArray, + valStrings: s, + } +} + +func NewStaticBooleanArray(b []bool) Static { + if b == nil { + return Static{Type: TypeBooleanArray} + } + if len(b) == 0 { + return Static{Type: TypeBooleanArray, valBytes: []byte{}} + } + + return Static{ + Type: TypeBooleanArray, + valBytes: unsafe.Slice((*byte)(unsafe.Pointer(&b[0])), len(b)), + } +} + +var seedBytes = []byte{204, 38, 247, 160, 15, 37, 67, 77} + +func (s Static) MapKey() StaticMapKey { switch s.Type { - case TypeInt: - if s.N > other.N { - return 1 - } else if s.N < other.N { - return -1 + case TypeNil: + return StaticMapKey{typ: TypeNil} + case TypeString: + var str string + if len(s.valBytes) > 0 { + str = unsafe.String(unsafe.SliceData(s.valBytes), len(s.valBytes)) } - case TypeFloat: - if s.F > other.F { - return 1 - } else if s.F < other.F { - return -1 + return StaticMapKey{typ: s.Type, str: str} + case TypeIntArray, TypeFloatArray, TypeBooleanArray: + if len(s.valBytes) == 0 { + return StaticMapKey{typ: s.Type} } - case TypeDuration: - if s.D > other.D { - return 1 - } else if s.D < other.D { - return -1 + + h := fnv.New64a() + _, _ = h.Write(s.valBytes) + return StaticMapKey{typ: s.Type, code: h.Sum64()} + case TypeStringArray: + if len(s.valStrings) == 0 { + return StaticMapKey{typ: s.Type} } - case TypeString: - if s.S > other.S { - return 1 - } else if s.S < other.S { - return -1 + + h := fnv.New64a() + _, _ = h.Write(seedBytes) // avoid collisions with values like []string{""} + for _, str := range s.valStrings { + _, _ = h.Write(unsafe.Slice(unsafe.StringData(str), len(str))) } - case TypeBoolean: - if s.B && !other.B { - return 1 - } else if !s.B && other.B { - return -1 + return StaticMapKey{typ: s.Type, code: h.Sum64()} + default: + return StaticMapKey{typ: s.Type, code: s.valScalar} + } +} + +func (s Static) Equals(o *Static) bool { + switch s.Type { + case TypeInt, TypeDuration: + switch o.Type { + case TypeInt, TypeDuration: + return s.valScalar == o.valScalar + case TypeStatus: + // only int can be compared to status + return s.Type == TypeInt && s.valScalar == o.valScalar + case TypeFloat: + of := math.Float64frombits(o.valScalar) + return s.Float() == of + default: + return false } case TypeStatus: - if s.Status > other.Status { - return 1 - } else if s.Status < other.Status { - return -1 - } - case TypeKind: - if s.Kind > other.Kind { - return 1 - } else if s.Kind < other.Kind { - return -1 + switch o.Type { + case TypeInt, TypeStatus: + return s.valScalar == o.valScalar + default: + return false } + case TypeFloat: + sf := math.Float64frombits(s.valScalar) + return sf == o.Float() + case TypeKind, TypeBoolean: + return s.Type == o.Type && s.valScalar == o.valScalar + case TypeString, TypeIntArray, TypeFloatArray, TypeBooleanArray: + return s.Type == o.Type && bytes.Equal(s.valBytes, o.valBytes) + case TypeStringArray: + return s.Type == o.Type && slices.Equal(s.valStrings, o.valStrings) + case TypeNil: + return o.Type == TypeNil + default: + // should not be reached + return false } - - return 0 } -func (s *Static) sumInto(other Static) { +func (s Static) StrictEquals(o *Static) bool { + if s.Type != o.Type { + return false + } + switch s.Type { - case TypeInt: - s.N += other.N case TypeFloat: - s.F += other.F - case TypeDuration: - s.D += other.D + sf := math.Float64frombits(s.valScalar) + of := math.Float64frombits(o.valScalar) + return sf == of + case TypeString, TypeIntArray, TypeFloatArray, TypeBooleanArray: + return bytes.Equal(s.valBytes, o.valBytes) + case TypeStringArray: + return slices.Equal(s.valStrings, o.valStrings) + case TypeNil: + return true + default: + return s.valScalar == o.valScalar } } -func (s Static) divideBy(f float64) Static { +func (s Static) compare(o *Static) int { + if s.Type != o.Type { + if s.isNumeric() && o.isNumeric() { + return cmp.Compare(s.Float(), o.Float()) + } + return cmp.Compare(s.Type, o.Type) + } + switch s.Type { - case TypeInt: - return NewStaticFloat(float64(s.N) / f) // there's no integer division in traceql - case TypeFloat: - return NewStaticFloat(s.F / f) - case TypeDuration: - return NewStaticDuration(s.D / time.Duration(f)) + case TypeString, TypeBooleanArray: + return bytes.Compare(s.valBytes, o.valBytes) + case TypeIntArray: + sa, _ := s.IntArray() + oa, _ := o.IntArray() + return slices.Compare(sa, oa) + case TypeFloatArray: + sa, _ := s.FloatArray() + oa, _ := o.FloatArray() + return slices.Compare(sa, oa) + case TypeStringArray: + return slices.Compare(s.valStrings, o.valStrings) + case TypeNil: + return 0 + default: + return cmp.Compare(int64(s.valScalar), int64(o.valScalar)) } +} - return s +func (s Static) Int() (int, bool) { + if s.Type != TypeInt { + return 0, false + } + return int(s.valScalar), true } -func (s Static) asFloat() float64 { +func (s Static) Float() float64 { switch s.Type { - case TypeInt: - return float64(s.N) case TypeFloat: - return s.F + return math.Float64frombits(s.valScalar) + case TypeInt: + return float64(int(s.valScalar)) case TypeDuration: - return float64(s.D.Nanoseconds()) + return float64(int64(s.valScalar)) default: return math.NaN() } } -func NewStaticInt(n int) Static { - return Static{ - Type: TypeInt, - N: n, +func (s Static) Bool() (bool, bool) { + if s.Type != TypeBoolean { + return false, false } + return s.valScalar != 0, true } -func NewStaticFloat(f float64) Static { - return Static{ - Type: TypeFloat, - F: f, +func (s Static) Duration() (time.Duration, bool) { + if s.Type != TypeDuration { + return 0, false } + return time.Duration(s.valScalar), true } -func NewStaticString(s string) Static { - return Static{ - Type: TypeString, - S: s, +func (s Static) Status() (Status, bool) { + if s.Type != TypeStatus { + return 0, false } + return Status(s.valScalar), true } -func NewStaticBool(b bool) Static { - return Static{ - Type: TypeBoolean, - B: b, +func (s Static) Kind() (Kind, bool) { + if s.Type != TypeKind { + return 0, false } + return Kind(s.valScalar), true } -func NewStaticNil() Static { - return Static{ - Type: TypeNil, +func (s Static) IntArray() ([]int, bool) { + if s.Type != TypeIntArray { + return nil, false } + + if s.valBytes == nil { + return nil, true + } + if len(s.valBytes) == 0 { + return []int{}, true + } + numInts := uintptr(len(s.valBytes)) / unsafe.Sizeof(int(0)) + return unsafe.Slice((*int)(unsafe.Pointer(&s.valBytes[0])), numInts), true } -func NewStaticDuration(d time.Duration) Static { - return Static{ - Type: TypeDuration, - D: d, +func (s Static) FloatArray() ([]float64, bool) { + if s.Type != TypeFloatArray { + return nil, false + } + + if s.valBytes == nil { + return nil, true + } + if len(s.valBytes) == 0 { + return []float64{}, true } + numFloats := uintptr(len(s.valBytes)) / unsafe.Sizeof(float64(0)) + return unsafe.Slice((*float64)(unsafe.Pointer(&s.valBytes[0])), numFloats), true } -func NewStaticStatus(s Status) Static { - return Static{ - Type: TypeStatus, - Status: s, +func (s Static) StringArray() ([]string, bool) { + if s.Type != TypeStringArray { + return nil, false + } + + return s.valStrings, true +} + +func (s Static) BooleanArray() ([]bool, bool) { + if s.Type != TypeBooleanArray { + return nil, false + } + + if s.valBytes == nil { + return nil, true + } + if len(s.valBytes) == 0 { + return []bool{}, true } + return unsafe.Slice((*bool)(unsafe.Pointer(&s.valBytes[0])), len(s.valBytes)), true } -func NewStaticKind(k Kind) Static { - return Static{ - Type: TypeKind, - Kind: k, +func (s Static) isNumeric() bool { + return s.Type.isNumeric() +} + +func (s *Static) sumInto(o *Static) { + if s.Type != o.Type { + return + } + switch s.Type { + case TypeInt: + s.valScalar = uint64(int(s.valScalar) + int(o.valScalar)) + case TypeDuration: + s.valScalar = uint64(time.Duration(s.valScalar) + time.Duration(o.valScalar)) + case TypeFloat: + sf := math.Float64frombits(s.valScalar) + of := math.Float64frombits(o.valScalar) + s.valScalar = math.Float64bits(sf + of) + } +} + +func (s Static) divideBy(f float64) Static { + switch s.Type { + case TypeInt: + return NewStaticFloat(float64(s.valScalar) / f) // there's no integer division in traceql + case TypeDuration: + return NewStaticDuration(time.Duration(s.valScalar) / time.Duration(f)) + case TypeFloat: + sf := math.Float64frombits(s.valScalar) + return NewStaticFloat(sf / f) } + return s } +func (Static) referencesSpan() bool { + return false +} + +func (s Static) impliedType() StaticType { + return s.Type +} + +// nolint: revive +func (Static) __fieldExpression() {} + +// nolint: revive +func (Static) __scalarExpression() {} + // ********************** // Attributes // ********************** @@ -878,7 +1082,7 @@ func (a *MetricsAggregate) init(q *tempopb.QueryRangeRequest, mode AggregateMode byFunc = func(s Span) (Static, bool) { d := s.DurationNanos() if d < 2 { - return Static{}, false + return NewStaticNil(), false } // Bucket is log2(nanos) converted to float seconds return NewStaticFloat(Log2Bucketize(d) / float64(time.Second)), true @@ -888,20 +1092,21 @@ func (a *MetricsAggregate) init(q *tempopb.QueryRangeRequest, mode AggregateMode byFunc = func(s Span) (Static, bool) { v, ok := s.AttributeFor(a.attr) if !ok { - return Static{}, false + return NewStaticNil(), false } // TODO(mdisibio) - Add support for floats, we need to map them into buckets. // Because of the range of floats, we need a native histogram approach. - if v.Type != TypeInt { - return Static{}, false + n, ok := v.Int() + if !ok { + return NewStaticNil(), false } - if v.N < 2 { - return Static{}, false + if n < 2 { + return NewStaticNil(), false } // Bucket is the value rounded up to the nearest power of 2 - return NewStaticFloat(Log2Bucketize(uint64(v.N))), true + return NewStaticFloat(Log2Bucketize(uint64(n))), true } } @@ -915,7 +1120,7 @@ func (a *MetricsAggregate) init(q *tempopb.QueryRangeRequest, mode AggregateMode byFunc = func(s Span) (Static, bool) { d := s.DurationNanos() if d < 2 { - return Static{}, false + return NewStaticNil(), false } // Bucket is in seconds return NewStaticFloat(Log2Bucketize(d) / float64(time.Second)), true @@ -925,19 +1130,19 @@ func (a *MetricsAggregate) init(q *tempopb.QueryRangeRequest, mode AggregateMode byFunc = func(s Span) (Static, bool) { v, ok := s.AttributeFor(a.attr) if !ok { - return Static{}, false + return NewStaticNil(), false } // TODO(mdisibio) - Add support for floats, we need to map them into buckets. // Because of the range of floats, we need a native histogram approach. if v.Type != TypeInt { - return Static{}, false + return NewStaticNil(), false } - - if v.N < 2 { - return Static{}, false + n, _ := v.Int() + if n < 2 { + return NewStaticNil(), false } - return NewStaticFloat(Log2Bucketize(uint64(v.N))), true + return NewStaticFloat(Log2Bucketize(uint64(n))), true } } } diff --git a/pkg/traceql/ast_conditions.go b/pkg/traceql/ast_conditions.go index 599d6540575..fb4540cdd12 100644 --- a/pkg/traceql/ast_conditions.go +++ b/pkg/traceql/ast_conditions.go @@ -13,7 +13,11 @@ func (f SpansetFilter) extractConditions(request *FetchSpansRequest) { // For empty spansets { } ensure there is something that matches all spans. // Use start time which would have been selected as part of the second pass // metadata, and is still fairly efficient to pull back. - if s, ok := f.Expression.(Static); ok && s.Type == TypeBoolean && s.B { + if s, ok := f.Expression.(Static); ok { + if b, ok := s.Bool(); !ok || !b { + return + } + for _, c := range request.Conditions { if c.Attribute.Intrinsic != IntrinsicNone && c.Op == OpNone { // A different match-all intrinsic is already present. diff --git a/pkg/traceql/ast_execute.go b/pkg/traceql/ast_execute.go index 15574678951..f3553bf28f8 100644 --- a/pkg/traceql/ast_execute.go +++ b/pkg/traceql/ast_execute.go @@ -30,14 +30,14 @@ func (g GroupOperation) evaluate(ss []*Spanset) ([]*Spanset, error) { } // Check if the result already has a group in the map - group, ok := groups[result] + group, ok := groups[result.MapKey()] if !ok { // If not, create a new group and add it to the map group = &Spanset{} // copy all existing attributes forward group.Attributes = append(group.Attributes, spanset.Attributes...) group.AddAttribute(g.String(), result) - groups[result] = group + groups[result.MapKey()] = group } // Add the current spanset to the group @@ -256,7 +256,7 @@ func (a Aggregate) evaluate(input []*Spanset) (output []*Spanset, err error) { if sum == nil { sum = &val } else { - sum.sumInto(val) + sum.sumInto(&val) } count++ } @@ -267,34 +267,34 @@ func (a Aggregate) evaluate(input []*Spanset) (output []*Spanset, err error) { output = append(output, cpy) case aggregateMax: - var max *Static + var maxS *Static for _, s := range ss.Spans { val, err := a.e.execute(s) if err != nil { return nil, err } - if max == nil || val.compare(max) == 1 { - max = &val + if maxS == nil || val.compare(maxS) > 0 { + maxS = &val } } cpy := ss.clone() - cpy.Scalar = *max + cpy.Scalar = *maxS cpy.AddAttribute(a.String(), cpy.Scalar) output = append(output, cpy) case aggregateMin: - var min *Static + var minS *Static for _, s := range ss.Spans { val, err := a.e.execute(s) if err != nil { return nil, err } - if min == nil || val.compare(min) == -1 { - min = &val + if minS == nil || val.compare(minS) == -1 { + minS = &val } } cpy := ss.clone() - cpy.Scalar = *min + cpy.Scalar = *minS cpy.AddAttribute(a.String(), cpy.Scalar) output = append(output, cpy) @@ -308,7 +308,7 @@ func (a Aggregate) evaluate(input []*Spanset) (output []*Spanset, err error) { if sum == nil { sum = &val } else { - sum.sumInto(val) + sum.sumInto(&val) } } cpy := ss.clone() @@ -336,8 +336,8 @@ func (o *BinaryOperation) execute(span Span) (Static, error) { } // Ensure the resolved types are still valid - lhsT := lhs.impliedType() - rhsT := rhs.impliedType() + lhsT := lhs.Type + rhsT := rhs.Type if !lhsT.isMatchingOperand(rhsT) { return NewStaticBool(false), nil } @@ -356,6 +356,26 @@ func (o *BinaryOperation) execute(span Span) (Static, error) { return NewStaticBool(strings.Compare(lhs.String(), rhs.String()) < 0), nil case OpLessEqual: return NewStaticBool(strings.Compare(lhs.String(), rhs.String()) <= 0), nil + case OpRegex: + if o.compiledExpression == nil { + o.compiledExpression, err = regexp.Compile(rhs.EncodeToString(false)) + if err != nil { + return NewStaticNil(), err + } + } + + matched := o.compiledExpression.MatchString(lhs.EncodeToString(false)) + return NewStaticBool(matched), err + case OpNotRegex: + if o.compiledExpression == nil { + o.compiledExpression, err = regexp.Compile(rhs.EncodeToString(false)) + if err != nil { + return NewStaticNil(), err + } + } + + matched := o.compiledExpression.MatchString(lhs.EncodeToString(false)) + return NewStaticBool(!matched), err default: } } @@ -363,77 +383,70 @@ func (o *BinaryOperation) execute(span Span) (Static, error) { // if both sides are integers then do integer math, otherwise we can drop to the // catch all below if lhsT == TypeInt && rhsT == TypeInt { + lhsN, _ := lhs.Int() + rhsN, _ := rhs.Int() + switch o.Op { case OpAdd: - return NewStaticInt(lhs.N + rhs.N), nil + return NewStaticInt(lhsN + rhsN), nil case OpSub: - return NewStaticInt(lhs.N - rhs.N), nil + return NewStaticInt(lhsN - rhsN), nil case OpDiv: - return NewStaticInt(lhs.N / rhs.N), nil + return NewStaticInt(lhsN / rhsN), nil case OpMod: - return NewStaticInt(lhs.N % rhs.N), nil + return NewStaticInt(lhsN % rhsN), nil case OpMult: - return NewStaticInt(lhs.N * rhs.N), nil + return NewStaticInt(lhsN * rhsN), nil case OpGreater: - return NewStaticBool(lhs.N > rhs.N), nil + return NewStaticBool(lhsN > rhsN), nil case OpGreaterEqual: - return NewStaticBool(lhs.N >= rhs.N), nil + return NewStaticBool(lhsN >= rhsN), nil case OpLess: - return NewStaticBool(lhs.N < rhs.N), nil + return NewStaticBool(lhsN < rhsN), nil case OpLessEqual: - return NewStaticBool(lhs.N <= rhs.N), nil + return NewStaticBool(lhsN <= rhsN), nil case OpPower: - return NewStaticInt(intPow(rhs.N, lhs.N)), nil + return NewStaticInt(intPow(rhsN, lhsN)), nil + } + } + + if lhsT == TypeBoolean && rhsT == TypeBoolean { + lhsB, _ := lhs.Bool() + rhsB, _ := rhs.Bool() + + switch o.Op { + case OpAnd: + return NewStaticBool(lhsB && rhsB), nil + case OpOr: + return NewStaticBool(lhsB || rhsB), nil } } switch o.Op { case OpAdd: - return NewStaticFloat(lhs.asFloat() + rhs.asFloat()), nil + return NewStaticFloat(lhs.Float() + rhs.Float()), nil case OpSub: - return NewStaticFloat(lhs.asFloat() - rhs.asFloat()), nil + return NewStaticFloat(lhs.Float() - rhs.Float()), nil case OpDiv: - return NewStaticFloat(lhs.asFloat() / rhs.asFloat()), nil + return NewStaticFloat(lhs.Float() / rhs.Float()), nil case OpMod: - return NewStaticFloat(math.Mod(lhs.asFloat(), rhs.asFloat())), nil + return NewStaticFloat(math.Mod(lhs.Float(), rhs.Float())), nil case OpMult: - return NewStaticFloat(lhs.asFloat() * rhs.asFloat()), nil + return NewStaticFloat(lhs.Float() * rhs.Float()), nil case OpGreater: - return NewStaticBool(lhs.asFloat() > rhs.asFloat()), nil + return NewStaticBool(lhs.Float() > rhs.Float()), nil case OpGreaterEqual: - return NewStaticBool(lhs.asFloat() >= rhs.asFloat()), nil + return NewStaticBool(lhs.Float() >= rhs.Float()), nil case OpLess: - return NewStaticBool(lhs.asFloat() < rhs.asFloat()), nil + return NewStaticBool(lhs.Float() < rhs.Float()), nil case OpLessEqual: - return NewStaticBool(lhs.asFloat() <= rhs.asFloat()), nil + return NewStaticBool(lhs.Float() <= rhs.Float()), nil case OpPower: - return NewStaticFloat(math.Pow(lhs.asFloat(), rhs.asFloat())), nil + return NewStaticFloat(math.Pow(lhs.Float(), rhs.Float())), nil case OpEqual: - return NewStaticBool(lhs.Equals(rhs)), nil + return NewStaticBool(lhs.Equals(&rhs)), nil case OpNotEqual: - return NewStaticBool(!lhs.Equals(rhs)), nil - case OpRegex: - if o.compiledExpression == nil { - o.compiledExpression, err = regexp.Compile(rhs.S) - if err != nil { - return NewStaticNil(), err - } - } - matched := o.compiledExpression.MatchString(lhs.S) - return NewStaticBool(matched), err - case OpNotRegex: - if o.compiledExpression == nil { - o.compiledExpression, err = regexp.Compile(rhs.S) - if err != nil { - return NewStaticNil(), err - } - } - matched := o.compiledExpression.MatchString(lhs.S) - return NewStaticBool(!matched), err - case OpAnd: - return NewStaticBool(lhs.B && rhs.B), nil - case OpOr: - return NewStaticBool(lhs.B || rhs.B), nil + return NewStaticBool(!lhs.Equals(&rhs)), nil default: return NewStaticNil(), errors.New("unexpected operator " + o.Op.String()) } @@ -451,23 +464,31 @@ func binOp(op Operator, lhs, rhs Static) (bool, error) { return false, nil } + if lhsT == TypeBoolean && rhsT == TypeBoolean { + lhsB, _ := lhs.Bool() + rhsB, _ := rhs.Bool() + + switch op { + case OpAnd: + return lhsB && rhsB, nil + case OpOr: + return lhsB || rhsB, nil + } + } + switch op { case OpGreater: - return lhs.asFloat() > rhs.asFloat(), nil + return lhs.Float() > rhs.Float(), nil case OpGreaterEqual: - return lhs.asFloat() >= rhs.asFloat(), nil + return lhs.Float() >= rhs.Float(), nil case OpLess: - return lhs.asFloat() < rhs.asFloat(), nil + return lhs.Float() < rhs.Float(), nil case OpLessEqual: - return lhs.asFloat() <= rhs.asFloat(), nil + return lhs.Float() <= rhs.Float(), nil case OpEqual: - return lhs.Equals(rhs), nil + return lhs.Equals(&rhs), nil case OpNotEqual: - return !lhs.Equals(rhs), nil - case OpAnd: - return lhs.B && rhs.B, nil - case OpOr: - return lhs.B || rhs.B, nil + return !lhs.Equals(&rhs), nil } return false, errors.New("unexpected operator " + op.String()) @@ -483,7 +504,8 @@ func (o UnaryOperation) execute(span Span) (Static, error) { if static.Type != TypeBoolean { return NewStaticNil(), fmt.Errorf("expression (%v) expected a boolean, but got %v", o, static.Type) } - return NewStaticBool(!static.B), nil + b, _ := static.Bool() + return NewStaticBool(!b), nil } if o.Op == OpSub { if !static.Type.isNumeric() { @@ -491,11 +513,13 @@ func (o UnaryOperation) execute(span Span) (Static, error) { } switch static.Type { case TypeInt: - return NewStaticInt(-1 * static.N), nil + n, _ := static.Int() + return NewStaticInt(-1 * n), nil case TypeFloat: - return NewStaticFloat(-1 * static.F), nil + return NewStaticFloat(-1 * static.Float()), nil case TypeDuration: - return NewStaticDuration(-1 * static.D), nil + d, _ := static.Duration() + return NewStaticDuration(-1 * d), nil } } diff --git a/pkg/traceql/ast_stringer.go b/pkg/traceql/ast_stringer.go index d7c846f4aa7..e9505ea5066 100644 --- a/pkg/traceql/ast_stringer.go +++ b/pkg/traceql/ast_stringer.go @@ -4,6 +4,7 @@ import ( "fmt" "strconv" "strings" + "unsafe" ) func (r RootExpr) String() string { @@ -76,34 +77,72 @@ func (o UnaryOperation) String() string { return unaryOp(o.Op, o.Expression) } -func (n Static) String() string { - return n.EncodeToString(true) +func (s Static) String() string { + return s.EncodeToString(true) } -func (n Static) EncodeToString(quotes bool) string { - switch n.Type { +func (s Static) EncodeToString(quotes bool) string { + switch s.Type { + case TypeNil: + return "nil" case TypeInt: - return strconv.Itoa(n.N) + i, _ := s.Int() + return strconv.Itoa(i) case TypeFloat: - return strconv.FormatFloat(n.F, 'g', -1, 64) + return strconv.FormatFloat(s.Float(), 'g', -1, 64) case TypeString: + var str string + if len(s.valBytes) > 0 { + str = unsafe.String(unsafe.SliceData(s.valBytes), len(s.valBytes)) + } if quotes { - return "`" + n.S + "`" + return "`" + str + "`" } - return n.S + return str case TypeBoolean: - return strconv.FormatBool(n.B) - case TypeNil: - return "nil" + b, _ := s.Bool() + return strconv.FormatBool(b) case TypeDuration: - return n.D.String() + d, _ := s.Duration() + return d.String() case TypeStatus: - return n.Status.String() + st, _ := s.Status() + return st.String() case TypeKind: - return n.Kind.String() + k, _ := s.Kind() + return k.String() + case TypeIntArray: + ints, _ := s.IntArray() + return arrayToString(ints, false) + case TypeFloatArray: + floats, _ := s.FloatArray() + return arrayToString(floats, false) + case TypeStringArray: + return arrayToString(s.valStrings, true) + case TypeBooleanArray: + booleans, _ := s.BooleanArray() + return arrayToString(booleans, false) + default: + return fmt.Sprintf("static(%d)", s.Type) + } +} + +func arrayToString[T any](array []T, quoted bool) string { + tmpl := "%v" + if quoted { + tmpl = `"%v"` + } + + var s strings.Builder + s.WriteByte('[') + for i, e := range array { + s.WriteString(fmt.Sprintf(tmpl, e)) + if i < len(array)-1 { + s.WriteString(", ") + } } - - return fmt.Sprintf("static(%d)", n.Type) + s.WriteRune(']') + return s.String() } func (a Attribute) String() string { diff --git a/pkg/traceql/ast_test.go b/pkg/traceql/ast_test.go index 3343bbd449c..1817dfcd4f6 100644 --- a/pkg/traceql/ast_test.go +++ b/pkg/traceql/ast_test.go @@ -2,6 +2,7 @@ package traceql import ( "fmt" + "math" "testing" "time" @@ -9,19 +10,448 @@ import ( "github.com/stretchr/testify/require" ) +func TestNewStaticNil(t *testing.T) { + s := NewStaticNil() + assert.Equal(t, TypeNil, s.Type) + assert.Equal(t, Static{}, s) +} + +func TestStatic_Int(t *testing.T) { + tests := []struct { + arg any + ok bool + }{ + // supported values + {arg: -math.MaxInt, ok: true}, + {arg: -1, ok: true}, + {arg: 0, ok: true}, + {arg: 1, ok: true}, + {arg: math.MaxInt, ok: true}, + // unsupported values + {arg: "test"}, + {arg: 3.14}, + {arg: true}, + {arg: StatusOk}, + {arg: KindClient}, + {arg: []int{1}}, + {arg: []float64{1.0}}, + {arg: []bool{true}}, + {arg: []string{"test"}}, + } + + for _, tt := range tests { + t.Run(testName(tt.arg), func(t *testing.T) { + static := newStatic(tt.arg) + i, ok := static.Int() + + require.Equal(t, tt.ok, ok) + if tt.ok { + assert.Equal(t, tt.arg, i) + } + }) + } +} + +func TestStatic_Float(t *testing.T) { + tests := []struct { + arg any + want float64 + }{ + {arg: -math.MaxFloat64}, + {arg: -1.0}, + {arg: 0.0}, + {arg: 10.0 + math.SmallestNonzeroFloat64}, + {arg: 1.0}, + {arg: math.MaxFloat64}, + {arg: int(3117), want: 3117.0}, + {arg: time.Duration(101), want: 101.0}, + {arg: "test", want: math.NaN()}, + {arg: true, want: math.NaN()}, + {arg: StatusError, want: math.NaN()}, + {arg: KindServer, want: math.NaN()}, + {arg: []int{1}, want: math.NaN()}, + {arg: []float64{1.0}, want: math.NaN()}, + {arg: []bool{true}, want: math.NaN()}, + {arg: []string{"test"}, want: math.NaN()}, + } + + for _, tt := range tests { + t.Run(testName(tt.arg), func(t *testing.T) { + static := newStatic(tt.arg) + f := static.Float() + + if static.Type == TypeFloat { + assert.Equal(t, tt.arg, f) + } else if math.IsNaN(tt.want) { + assert.True(t, math.IsNaN(f)) + } else { + assert.Equal(t, tt.want, f) + } + }) + } +} + +func TestStatic_String(t *testing.T) { + tests := []struct { + arg any + want string + }{ + {arg: nil, want: "nil"}, + {arg: 101, want: "101"}, + {arg: -10, want: "-10"}, + {arg: -1.0, want: "-1"}, + {arg: 0.0, want: "0"}, + {arg: 10.0, want: "10"}, + {arg: "test", want: "`test`"}, + {arg: true, want: "true"}, + {arg: StatusOk, want: "ok"}, + {arg: KindClient, want: "client"}, + {arg: time.Duration(70) * time.Second, want: "1m10s"}, + {arg: []int{1, 2, 3}, want: "[1, 2, 3]"}, + {arg: []float64{1.1, 3.3}, want: "[1.1, 3.3]"}, + {arg: []bool{true, true, false}, want: "[true, true, false]"}, + {arg: []string{"aa", "bb"}, want: `["aa", "bb"]`}, + } + + for _, tt := range tests { + t.Run(testName(tt.arg), func(t *testing.T) { + static := newStatic(tt.arg) + assert.Equal(t, tt.want, static.String()) + }) + } +} + +func TestStatic_Bool(t *testing.T) { + tests := []struct { + arg any + ok bool + }{ + // supported values + {arg: false, ok: true}, + {arg: true, ok: true}, + // unsupported values + {arg: 3.14}, + {arg: "test"}, + {arg: time.Duration(1)}, + {arg: StatusOk}, + {arg: KindClient}, + {arg: []int{1, 2, 3}}, + {arg: []float64{1.0, 2.2}}, + {arg: []bool{true, true}}, + {arg: []string{"aa", "bb"}}, + } + + for _, tt := range tests { + t.Run(testName(tt.arg), func(t *testing.T) { + static := newStatic(tt.arg) + b, ok := static.Bool() + + require.Equal(t, tt.ok, ok) + if tt.ok { + assert.Equal(t, tt.arg, b) + } + }) + } +} + +func TestStatic_Duration(t *testing.T) { + tests := []struct { + arg any + ok bool + }{ + // supported values + {arg: time.Duration(0), ok: true}, + {arg: time.Duration(1), ok: true}, + {arg: time.Duration(100) * time.Second, ok: true}, + // unsupported values + {arg: 1}, + {arg: 3.14}, + {arg: "test"}, + {arg: true}, + {arg: StatusOk}, + {arg: KindClient}, + {arg: []int{1, 2, 3}}, + {arg: []float64{1.0, 2.2}}, + {arg: []bool{true, true}}, + {arg: []string{"aa", "bb"}}, + } + + for _, tt := range tests { + t.Run(testName(tt.arg), func(t *testing.T) { + static := newStatic(tt.arg) + d, ok := static.Duration() + + require.Equal(t, tt.ok, ok) + if tt.ok { + assert.Equal(t, tt.arg, d) + } + }) + } +} + +func TestStatic_Status(t *testing.T) { + tests := []struct { + arg any + ok bool + }{ + // supported values + {arg: StatusError, ok: true}, + {arg: StatusOk, ok: true}, + {arg: StatusUnset, ok: true}, + // unsupported values + {arg: 1}, + {arg: 3.14}, + {arg: "test"}, + {arg: true}, + {arg: KindClient}, + {arg: []int{1, 2, 3}}, + {arg: []float64{1.0, 2.2}}, + {arg: []bool{true, true}}, + {arg: []string{"aa", "bb"}}, + } + + for _, tt := range tests { + t.Run(testName(tt.arg), func(t *testing.T) { + static := newStatic(tt.arg) + s, ok := static.Status() + + require.Equal(t, tt.ok, ok) + if tt.ok { + assert.Equal(t, tt.arg, s) + } + }) + } +} + +func TestStatic_Kind(t *testing.T) { + tests := []struct { + arg any + ok bool + }{ + // supported values + {arg: KindUnspecified, ok: true}, + {arg: KindInternal, ok: true}, + {arg: KindClient, ok: true}, + {arg: KindServer, ok: true}, + {arg: KindProducer, ok: true}, + {arg: KindConsumer, ok: true}, + // unsupported values + {arg: 1}, + {arg: 3.14}, + {arg: "test"}, + {arg: true}, + {arg: StatusOk}, + {arg: []int{1, 2, 3}}, + {arg: []float64{1.0, 2.2}}, + {arg: []bool{true, true}}, + {arg: []string{"aa", "bb"}}, + } + + for _, tt := range tests { + t.Run(testName(tt.arg), func(t *testing.T) { + static := newStatic(tt.arg) + k, ok := static.Kind() + + require.Equal(t, tt.ok, ok) + if tt.ok { + assert.Equal(t, tt.arg, k) + } + }) + } +} + +func TestStatic_IntArray(t *testing.T) { + tests := []struct { + arg any + ok bool + }{ + // supported values + {arg: []int(nil), ok: true}, + {arg: []int{}, ok: true}, + {arg: []int{1}, ok: true}, + {arg: []int{1, 2, 3, 4, 5}, ok: true}, + // unsupported values + {arg: 1}, + {arg: 3.14}, + {arg: "test"}, + {arg: true}, + {arg: StatusOk}, + {arg: KindClient}, + {arg: []float64{1.0, 2.2}}, + {arg: []bool{true, true}}, + {arg: []string{"aa", "bb"}}, + } + + for _, tt := range tests { + t.Run(testName(tt.arg), func(t *testing.T) { + static := newStatic(tt.arg) + a, ok := static.IntArray() + + require.Equal(t, tt.ok, ok) + if tt.ok { + assert.Equal(t, tt.arg, a) + } + }) + } +} + +func TestStatic_FloatArray(t *testing.T) { + tests := []struct { + arg any + ok bool + }{ + // supported values + {arg: []float64(nil), ok: true}, + {arg: []float64{}, ok: true}, + {arg: []float64{1.11}, ok: true}, + {arg: []float64{1.11 + math.SmallestNonzeroFloat64}, ok: true}, + {arg: []float64{1.23, 2.0, 3.14, 4.1, 5.15}, ok: true}, + // unsupported values + {arg: 1}, + {arg: 3.14}, + {arg: "test"}, + {arg: true}, + {arg: StatusOk}, + {arg: KindClient}, + {arg: []int{1, 2}}, + {arg: []bool{true, true}}, + {arg: []string{"aa", "bb"}}, + } + + for _, tt := range tests { + t.Run(testName(tt.arg), func(t *testing.T) { + static := newStatic(tt.arg) + f, ok := static.FloatArray() + + require.Equal(t, tt.ok, ok) + if tt.ok { + assert.Equal(t, tt.arg, f) + } + }) + } +} + +func TestStatic_StringArray(t *testing.T) { + tests := []struct { + arg any + ok bool + }{ + // supported values + {arg: []string(nil), ok: true}, + {arg: []string{}, ok: true}, + {arg: []string{""}, ok: true}, + {arg: []string{"aa"}, ok: true}, + {arg: []string{"aa", "bb"}, ok: true}, + // unsupported values + {arg: 1}, + {arg: 3.14}, + {arg: "test"}, + {arg: true}, + {arg: StatusOk}, + {arg: KindClient}, + {arg: []int{1, 2}}, + {arg: []float64{1.0, 2.2}}, + {arg: []bool{true, true}}, + } + + for _, tt := range tests { + t.Run(testName(tt.arg), func(t *testing.T) { + static := newStatic(tt.arg) + s, ok := static.StringArray() + + require.Equal(t, tt.ok, ok) + if tt.ok { + assert.Equal(t, tt.arg, s) + } + }) + } +} + +func TestStatic_BooleanArray(t *testing.T) { + tests := []struct { + arg any + ok bool + }{ + // supported values + {arg: []bool(nil), ok: true}, + {arg: []bool{}, ok: true}, + {arg: []bool{true}, ok: true}, + {arg: []bool{false}, ok: true}, + {arg: []bool{false, true}, ok: true}, + // unsupported values + {arg: 1}, + {arg: 3.14}, + {arg: "test"}, + {arg: true}, + {arg: StatusOk}, + {arg: KindClient}, + {arg: []int{1, 2}}, + {arg: []float64{1.0, 2.2}}, + {arg: []string{"aa", "bb"}}, + } + + for _, tt := range tests { + t.Run(testName(tt.arg), func(t *testing.T) { + static := newStatic(tt.arg) + b, ok := static.BooleanArray() + + require.Equal(t, tt.ok, ok) + if tt.ok { + assert.Equal(t, tt.arg, b) + } + }) + } +} + +func TestStatic_MapKey(t *testing.T) { + staticVals := []Static{ + NewStaticNil(), + NewStaticInt(0), NewStaticInt(1), NewStaticInt(2), NewStaticInt(-2), NewStaticInt(math.MaxInt), NewStaticInt(math.MinInt), + NewStaticDuration(0), NewStaticDuration(2), NewStaticDuration(-2), + NewStaticFloat(0), NewStaticFloat(2), NewStaticFloat(-2), NewStaticFloat(math.SmallestNonzeroFloat64), NewStaticFloat(math.MaxFloat64), NewStaticFloat(-math.MaxFloat64), + NewStaticKind(KindUnspecified), NewStaticKind(KindClient), // corresponds to 0 and 2 + NewStaticStatus(StatusError), NewStaticStatus(StatusUnset), // corresponds to 0 and 2 + NewStaticBool(false), NewStaticBool(true), + NewStaticString(""), NewStaticString("foo"), + NewStaticIntArray([]int{}), NewStaticIntArray([]int{1, 2, 3}), + NewStaticFloatArray([]float64{}), NewStaticFloatArray([]float64{0.0}), NewStaticFloatArray([]float64{0.0, 2.0, 3.0}), + NewStaticStringArray([]string{}), NewStaticStringArray([]string{""}), NewStaticStringArray([]string{"foo"}), + NewStaticBooleanArray([]bool{}), NewStaticBooleanArray([]bool{false}), NewStaticBooleanArray([]bool{true}), + } + + // All above values must have unique MapKey + occurredValues := make(map[StaticMapKey]Static, len(staticVals)) + for _, s := range staticVals { + mk := s.MapKey() + + prev, found := occurredValues[mk] + occurredValues[mk] = s + + assert.False(t, found, "static values produce the same MapKey %s, %s", prev.String(), s.String()) + } +} + func TestStatic_Equals(t *testing.T) { areEqual := []struct { lhs, rhs Static }{ {NewStaticInt(1), NewStaticInt(1)}, {NewStaticFloat(1.5), NewStaticFloat(1.5)}, - {NewStaticInt(1), NewStaticFloat(1)}, + {NewStaticInt(2), NewStaticFloat(2.0)}, {NewStaticString("foo"), NewStaticString("foo")}, {NewStaticBool(true), NewStaticBool(true)}, {NewStaticDuration(1 * time.Second), NewStaticDuration(1000 * time.Millisecond)}, {NewStaticStatus(StatusOk), NewStaticStatus(StatusOk)}, {NewStaticKind(KindClient), NewStaticKind(KindClient)}, {NewStaticDuration(0), NewStaticInt(0)}, + {NewStaticIntArray([]int{}), NewStaticIntArray(nil)}, + {NewStaticIntArray([]int{11, 111}), NewStaticIntArray([]int{11, 111})}, + {NewStaticFloatArray([]float64{}), NewStaticFloatArray(nil)}, + {NewStaticFloatArray([]float64{1.1, 2.2}), NewStaticFloatArray([]float64{1.1, 2.2})}, + {NewStaticStringArray([]string{}), NewStaticStringArray(nil)}, + {NewStaticStringArray([]string{"foo", "bar"}), NewStaticStringArray([]string{"foo", "bar"})}, + {NewStaticBooleanArray([]bool{}), NewStaticBooleanArray(nil)}, + {NewStaticBooleanArray([]bool{true, false}), NewStaticBooleanArray([]bool{true, false})}, // Status and int comparison {NewStaticStatus(StatusError), NewStaticInt(0)}, {NewStaticStatus(StatusOk), NewStaticInt(1)}, @@ -35,19 +465,103 @@ func TestStatic_Equals(t *testing.T) { {NewStaticString("foo"), NewStaticString("bar")}, {NewStaticKind(KindClient), NewStaticKind(KindConsumer)}, {NewStaticStatus(StatusError), NewStaticStatus(StatusOk)}, - {NewStaticStatus(StatusOk), NewStaticInt(0)}, + {NewStaticStatus(StatusOk), NewStaticStatus(StatusUnset)}, + {NewStaticStatus(StatusOk), NewStaticKind(KindInternal)}, {NewStaticStatus(StatusError), NewStaticFloat(0)}, + {NewStaticIntArray([]int{}), NewStaticIntArray([]int{0})}, + {NewStaticIntArray([]int{111, 11}), NewStaticIntArray([]int{11, 111})}, + {NewStaticFloatArray([]float64{}), NewStaticFloatArray([]float64{0.0})}, + {NewStaticFloatArray([]float64{1.1, 2.2}), NewStaticFloatArray([]float64{2.2, 1.1})}, + {NewStaticStringArray([]string{}), NewStaticStringArray([]string{""})}, + {NewStaticStringArray([]string{"foo", "bar"}), NewStaticStringArray([]string{"bar", "foo"})}, + {NewStaticBooleanArray([]bool{}), NewStaticBooleanArray([]bool{true})}, + {NewStaticBooleanArray([]bool{true, false}), NewStaticBooleanArray([]bool{false, true})}, } for _, tt := range areEqual { - t.Run(fmt.Sprintf("%v == %v", tt.lhs, tt.rhs), func(t *testing.T) { - assert.True(t, tt.lhs.Equals(tt.rhs)) - assert.True(t, tt.rhs.Equals(tt.lhs)) + t.Run(fmt.Sprintf("%s==%s", testName(tt.rhs), testName(tt.rhs)), func(t *testing.T) { + assert.True(t, tt.lhs.Equals(&tt.rhs)) + assert.True(t, tt.rhs.Equals(&tt.lhs)) }) } for _, tt := range areNotEqual { - t.Run(fmt.Sprintf("%v != %v", tt.lhs, tt.rhs), func(t *testing.T) { - assert.False(t, tt.lhs.Equals(tt.rhs)) - assert.False(t, tt.rhs.Equals(tt.lhs)) + t.Run(fmt.Sprintf("%s!=%s", testName(tt.lhs), testName(tt.rhs)), func(t *testing.T) { + assert.False(t, tt.lhs.Equals(&tt.rhs)) + assert.False(t, tt.rhs.Equals(&tt.lhs)) + }) + } +} + +func TestStatic_compare(t *testing.T) { + testCases := []struct { + s1, s2 Static + want int + }{ + {s1: NewStaticInt(10), s2: NewStaticInt(5), want: 1}, + {s1: NewStaticInt(5), s2: NewStaticInt(-10), want: 1}, + {s1: NewStaticInt(20), s2: NewStaticInt(20), want: 0}, + {s1: NewStaticFloat(10.5), s2: NewStaticFloat(5.5), want: 1}, + {s1: NewStaticFloat(100.0), s2: NewStaticInt(100), want: 0}, + {s1: NewStaticFloat(100.0), s2: NewStaticInt(50), want: 1}, + {s1: NewStaticString("world"), s2: NewStaticString("hello"), want: 1}, + {s1: NewStaticBool(true), s2: NewStaticBool(false), want: 1}, + {s1: NewStaticDuration(10 * time.Second), s2: NewStaticDuration(5 * time.Second), want: 1}, + {s1: NewStaticDuration(10), s2: NewStaticInt(10), want: 0}, + {s1: NewStaticIntArray([]int{1, 3, 3}), s2: NewStaticIntArray([]int{1, -2, 3}), want: 1}, + {s1: NewStaticIntArray([]int{1, 2, 3}), s2: NewStaticIntArray([]int{1, 2, 3}), want: 0}, + {s1: NewStaticFloatArray([]float64{1.1, math.SmallestNonzeroFloat64}), s2: NewStaticFloatArray([]float64{1.1, 0}), want: 1}, + {s1: NewStaticFloatArray([]float64{1.1, 2.2, 3.3}), s2: NewStaticFloatArray([]float64{1.1, -2.2, 3.3}), want: 1}, + {s1: NewStaticFloatArray([]float64{1.1, 2.2, 3.3}), s2: NewStaticFloatArray([]float64{1.1, 2.2, 3.3}), want: 0}, + {s1: NewStaticStringArray([]string{"a", "b", "c"}), s2: NewStaticStringArray([]string{"a", "b"}), want: 1}, + {s1: NewStaticStringArray([]string{"a", "b"}), s2: NewStaticStringArray([]string{"a", "b"}), want: 0}, + {s1: NewStaticBooleanArray([]bool{true, false, true}), s2: NewStaticBooleanArray([]bool{true, false}), want: 1}, + {s1: NewStaticBooleanArray([]bool{true, false}), s2: NewStaticBooleanArray([]bool{true, false}), want: 0}, + } + + for _, tt := range testCases { + t.Run(fmt.Sprintf("%s<>%s", tt.s1.String(), tt.s2.String()), func(t *testing.T) { + res := tt.s1.compare(&tt.s2) + require.Equal(t, tt.want, res, "s1.compare(s2)") + res = tt.s2.compare(&tt.s1) + require.Equal(t, -tt.want, res, "s2.compare(s1)") + }) + } +} + +func TestStatic_sumInto(t *testing.T) { + tests := []struct { + s1, s2, want Static + }{ + {NewStaticInt(1), NewStaticInt(2), NewStaticInt(3)}, + {NewStaticInt(-3), NewStaticInt(2), NewStaticInt(-1)}, + {NewStaticInt(-3), NewStaticDuration(3), NewStaticInt(-3)}, + {NewStaticDuration(2 * time.Second), NewStaticDuration(1 * time.Second), NewStaticDuration(3 * time.Second)}, + {NewStaticDuration(2 * time.Second), NewStaticInt(3000), NewStaticDuration(2 * time.Second)}, + {NewStaticFloat(1.5), NewStaticFloat(2.5), NewStaticFloat(4.0)}, + {NewStaticFloat(-4.5), NewStaticFloat(2.0), NewStaticFloat(-2.5)}, + {NewStaticFloat(3.14), NewStaticInt(1), NewStaticFloat(3.14)}, + } + for _, tt := range tests { + t.Run(fmt.Sprintf("%s+%s", tt.s1.String(), tt.s2.String()), func(t *testing.T) { + tt.s1.sumInto(&tt.s2) + assert.Equal(t, tt.want, tt.s1, "s1.sumInto(s2)") + }) + } +} + +func TestStatic_divideBy(t *testing.T) { + tests := []struct { + s Static + f float64 + want Static + }{ + {NewStaticInt(10), 2, NewStaticFloat(5)}, + {NewStaticDuration(12 * time.Second), 2.1, NewStaticDuration(6 * time.Second)}, + {NewStaticFloat(12.2), 2.0, NewStaticFloat(6.1)}, + } + for _, tt := range tests { + t.Run(fmt.Sprintf("%s/%.2f", tt.s.String(), tt.f), func(t *testing.T) { + s := tt.s.divideBy(tt.f) + assert.Equal(t, tt.want, s, "s.divideBy(f)") }) } } @@ -300,47 +814,6 @@ func TestSpansetFilterEvaluate(t *testing.T) { } } -func TestStaticCompare(t *testing.T) { - testCases := []struct { - name string - s1 Static - s2 Static - expected int - }{ - { - name: "IntComparison_Greater", - s1: Static{Type: TypeInt, N: 10}, - s2: Static{Type: TypeInt, N: 5}, - expected: 1, - }, - { - name: "FloatComparison_Greater", - s1: Static{Type: TypeFloat, F: 10.5}, - s2: Static{Type: TypeFloat, F: 5.5}, - expected: 1, - }, - { - name: "StringComparison_Less", - s1: Static{Type: TypeString, S: "hello"}, - s2: Static{Type: TypeString, S: "world"}, - expected: -1, - }, - { - name: "BooleanComparison_Greater", - s1: Static{Type: TypeBoolean, B: true}, - s2: Static{Type: TypeBoolean, B: false}, - expected: 1, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - result := tc.s1.compare(&tc.s2) - require.Equal(t, tc.expected, result) - }) - } -} - var _ Span = (*mockSpan)(nil) type mockSpan struct { @@ -474,3 +947,59 @@ func loop(lhs []Span, rhs []Span, falseForAll bool, invert bool, eval func(s1 Sp return out } + +func newStatic(val any) Static { + if val == nil { + return NewStaticNil() + } + + switch v := val.(type) { + case int: + return NewStaticInt(v) + case float64: + return NewStaticFloat(v) + case string: + return NewStaticString(v) + case bool: + return NewStaticBool(v) + case time.Duration: + return NewStaticDuration(v) + case Status: + return NewStaticStatus(v) + case Kind: + return NewStaticKind(v) + case []int: + return NewStaticIntArray(v) + case []float64: + return NewStaticFloatArray(v) + case []string: + return NewStaticStringArray(v) + case []bool: + return NewStaticBooleanArray(v) + default: + panic(fmt.Sprintf("unsupported type %T", val)) + } +} + +func testName(val any) string { + if val == nil { + return "nil" + } + + switch v := val.(type) { + case float64: + return fmt.Sprintf("%e", v) + case []int: + return fmt.Sprintf("[%d]int", len(v)) + case []float64: + return fmt.Sprintf("[%d]float", len(v)) + case []string: + return fmt.Sprintf("[%d]str", len(v)) + case []bool: + return fmt.Sprintf("[%d]bool", len(v)) + case Static: + return v.EncodeToString(false) + default: + return fmt.Sprintf("%v", v) + } +} diff --git a/pkg/traceql/ast_validate.go b/pkg/traceql/ast_validate.go index ebd3c0d2f1c..be0150d0c75 100644 --- a/pkg/traceql/ast_validate.go +++ b/pkg/traceql/ast_validate.go @@ -241,11 +241,7 @@ func (o UnaryOperation) validate() error { return nil } -func (n Static) validate() error { - // if n.Type == TypeNil { - // return newUnsupportedError("nil") - // } - +func (s Static) validate() error { return nil } diff --git a/pkg/traceql/engine.go b/pkg/traceql/engine.go index e7176e4193b..448fcfec020 100644 --- a/pkg/traceql/engine.go +++ b/pkg/traceql/engine.go @@ -293,7 +293,7 @@ func (e *Engine) asTraceSearchMetadata(spanset *Spanset) *tempopb.TraceSearchMet atts := span.AllAttributes() if name, ok := atts[NewIntrinsic(IntrinsicName)]; ok { - tempopbSpan.Name = name.S + tempopbSpan.Name = name.EncodeToString(false) } for attribute, static := range atts { @@ -331,7 +331,9 @@ func (e *Engine) asTraceSearchMetadata(spanset *Spanset) *tempopb.TraceSearchMet // add attributes for _, att := range spanset.Attributes { if att.Name == attributeMatched { - metadata.SpanSet.Matched = uint32(att.Val.N) + if n, ok := att.Val.Int(); ok { + metadata.SpanSet.Matched = uint32(n) + } continue } @@ -353,60 +355,45 @@ func unixSecToNano(ts uint32) uint64 { func (s Static) AsAnyValue() *common_v1.AnyValue { switch s.Type { case TypeInt: + n, _ := s.Int() return &common_v1.AnyValue{ Value: &common_v1.AnyValue_IntValue{ - IntValue: int64(s.N), - }, - } - case TypeString: - return &common_v1.AnyValue{ - Value: &common_v1.AnyValue_StringValue{ - StringValue: s.S, + IntValue: int64(n), }, } case TypeFloat: return &common_v1.AnyValue{ Value: &common_v1.AnyValue_DoubleValue{ - DoubleValue: s.F, + DoubleValue: s.Float(), }, } case TypeBoolean: + b, _ := s.Bool() return &common_v1.AnyValue{ Value: &common_v1.AnyValue_BoolValue{ - BoolValue: s.B, + BoolValue: b, }, } case TypeDuration: + d, _ := s.Duration() return &common_v1.AnyValue{ Value: &common_v1.AnyValue_StringValue{ - StringValue: s.D.String(), - }, - } - case TypeStatus: - return &common_v1.AnyValue{ - Value: &common_v1.AnyValue_StringValue{ - StringValue: s.Status.String(), + StringValue: d.String(), }, } - case TypeNil: + case TypeString, TypeStatus, TypeNil, TypeKind: return &common_v1.AnyValue{ Value: &common_v1.AnyValue_StringValue{ - StringValue: "nil", + StringValue: s.EncodeToString(false), }, } - case TypeKind: + default: return &common_v1.AnyValue{ Value: &common_v1.AnyValue_StringValue{ - StringValue: s.Kind.String(), + StringValue: fmt.Sprintf("error formatting val: static has unexpected type %v", s.Type), }, } } - - return &common_v1.AnyValue{ - Value: &common_v1.AnyValue_StringValue{ - StringValue: fmt.Sprintf("error formatting val: static has unexpected type %v", s.Type), - }, - } } func StaticFromAnyValue(a *common_v1.AnyValue) Static { diff --git a/pkg/traceql/engine_metrics.go b/pkg/traceql/engine_metrics.go index 62a21c55b90..f67fea5a310 100644 --- a/pkg/traceql/engine_metrics.go +++ b/pkg/traceql/engine_metrics.go @@ -104,8 +104,13 @@ func (ls Labels) String() string { switch { case l.Value.Type == TypeNil: promValue = "" - case l.Value.Type == TypeString && l.Value.S == "": - promValue = "" + case l.Value.Type == TypeString: + s := l.Value.EncodeToString(false) + if s != "" { + promValue = s + } else { + promValue = "" + } default: promValue = l.Value.EncodeToString(false) } @@ -280,15 +285,31 @@ const maxGroupBys = 5 // TODO - This isn't ideal but see comment below. // the maximum number of values. type ( - FastValues1 [1]Static - FastValues2 [2]Static - FastValues3 [3]Static - FastValues4 [4]Static - FastValues5 [5]Static + FastStatic1 [1]StaticMapKey + FastStatic2 [2]StaticMapKey + FastStatic3 [3]StaticMapKey + FastStatic4 [4]StaticMapKey + FastStatic5 [5]StaticMapKey +) + +type FastStatic interface { + FastStatic1 | FastStatic2 | FastStatic3 | FastStatic4 | FastStatic5 +} + +type ( + StaticVals1 [1]Static + StaticVals2 [2]Static + StaticVals3 [3]Static + StaticVals4 [4]Static + StaticVals5 [5]Static ) +type StaticVals interface { + StaticVals1 | StaticVals2 | StaticVals3 | StaticVals4 | StaticVals5 +} + // GroupingAggregator groups spans into series based on attribute values. -type GroupingAggregator[FV FastValues1 | FastValues2 | FastValues3 | FastValues4 | FastValues5] struct { +type GroupingAggregator[F FastStatic, S StaticVals] struct { // Config by []Attribute // Original attributes: .foo byLookups [][]Attribute // Lookups: span.foo resource.foo @@ -297,13 +318,23 @@ type GroupingAggregator[FV FastValues1 | FastValues2 | FastValues3 | FastValues4 innerAgg func() RangeAggregator // Data - series map[FV]RangeAggregator - lastSeries RangeAggregator - buf FV - lastBuf FV + series map[F]aggregatorWitValues[S] + lastSeries aggregatorWitValues[S] + buf fastStaticWithValues[F, S] + lastBuf fastStaticWithValues[F, S] +} + +type aggregatorWitValues[S StaticVals] struct { + agg RangeAggregator + vals S +} + +type fastStaticWithValues[F FastStatic, S StaticVals] struct { + fast F + vals S } -var _ SpanAggregator = (*GroupingAggregator[FastValues5])(nil) +var _ SpanAggregator = (*GroupingAggregator[FastStatic1, StaticVals1])(nil) func NewGroupingAggregator(aggName string, innerAgg func() RangeAggregator, by []Attribute, byFunc func(Span) (Static, bool), byFuncLabel string) SpanAggregator { if len(by) == 0 && byFunc == nil { @@ -334,23 +365,23 @@ func NewGroupingAggregator(aggName string, innerAgg func() RangeAggregator, by [ switch aggNum { case 1: - return newGroupingAggregator[FastValues1](innerAgg, by, byFunc, byFuncLabel, lookups) + return newGroupingAggregator[FastStatic1, StaticVals1](innerAgg, by, byFunc, byFuncLabel, lookups) case 2: - return newGroupingAggregator[FastValues2](innerAgg, by, byFunc, byFuncLabel, lookups) + return newGroupingAggregator[FastStatic2, StaticVals2](innerAgg, by, byFunc, byFuncLabel, lookups) case 3: - return newGroupingAggregator[FastValues3](innerAgg, by, byFunc, byFuncLabel, lookups) + return newGroupingAggregator[FastStatic3, StaticVals3](innerAgg, by, byFunc, byFuncLabel, lookups) case 4: - return newGroupingAggregator[FastValues4](innerAgg, by, byFunc, byFuncLabel, lookups) + return newGroupingAggregator[FastStatic4, StaticVals4](innerAgg, by, byFunc, byFuncLabel, lookups) case 5: - return newGroupingAggregator[FastValues5](innerAgg, by, byFunc, byFuncLabel, lookups) + return newGroupingAggregator[FastStatic5, StaticVals5](innerAgg, by, byFunc, byFuncLabel, lookups) default: panic("unsupported number of group-bys") } } -func newGroupingAggregator[FV FastValues1 | FastValues2 | FastValues3 | FastValues4 | FastValues5](innerAgg func() RangeAggregator, by []Attribute, byFunc func(Span) (Static, bool), byFuncLabel string, lookups [][]Attribute) SpanAggregator { - return &GroupingAggregator[FV]{ - series: map[FV]RangeAggregator{}, +func newGroupingAggregator[F FastStatic, S StaticVals](innerAgg func() RangeAggregator, by []Attribute, byFunc func(Span) (Static, bool), byFuncLabel string, lookups [][]Attribute) SpanAggregator { + return &GroupingAggregator[F, S]{ + series: map[F]aggregatorWitValues[S]{}, by: by, byFunc: byFunc, byFuncLabel: byFuncLabel, @@ -361,13 +392,15 @@ func newGroupingAggregator[FV FastValues1 | FastValues2 | FastValues3 | FastValu // Observe the span by looking up its group-by attributes, mapping to the series, // and passing to the inner aggregate. This is a critical hot path. -func (g *GroupingAggregator[FV]) Observe(span Span) { +func (g *GroupingAggregator[F, S]) Observe(span Span) { // Get grouping values // Reuse same buffer // There is no need to reset, the number of group-by attributes // is fixed after creation. for i, lookups := range g.byLookups { - g.buf[i] = lookup(lookups, span) + val := lookup(lookups, span) + g.buf.vals[i] = val + g.buf.fast[i] = val.MapKey() } // If dynamic label exists calculate and append it @@ -377,23 +410,25 @@ func (g *GroupingAggregator[FV]) Observe(span Span) { // Totally drop this span return } - g.buf[len(g.byLookups)] = v + g.buf.vals[len(g.byLookups)] = v + g.buf.fast[len(g.byLookups)] = v.MapKey() } - if g.lastSeries != nil && g.lastBuf == g.buf { - g.lastSeries.Observe(span) + if g.lastSeries.agg != nil && g.lastBuf.fast == g.buf.fast { + g.lastSeries.agg.Observe(span) return } - agg, ok := g.series[g.buf] + s, ok := g.series[g.buf.fast] if !ok { - agg = g.innerAgg() - g.series[g.buf] = agg + s.agg = g.innerAgg() + s.vals = g.buf.vals + g.series[g.buf.fast] = s } g.lastBuf = g.buf - g.lastSeries = agg - agg.Observe(span) + g.lastSeries = s + s.agg.Observe(span) } // labelsFor gives the final labels for the series. Slower and not on the hot path. @@ -425,7 +460,7 @@ func (g *GroupingAggregator[FV]) Observe(span Span) { // // Ex: rate() by (x,y,z) and all nil yields: // {x="nil"} -func (g *GroupingAggregator[FV]) labelsFor(vals FV) (Labels, string) { +func (g *GroupingAggregator[F, S]) labelsFor(vals S) (Labels, string) { labels := make(Labels, 0, len(g.by)+1) for i := range g.by { if vals[i].Type == TypeNil { @@ -445,15 +480,15 @@ func (g *GroupingAggregator[FV]) labelsFor(vals FV) (Labels, string) { return labels, labels.String() } -func (g *GroupingAggregator[FV]) Series() SeriesSet { +func (g *GroupingAggregator[F, S]) Series() SeriesSet { ss := SeriesSet{} - for vals, agg := range g.series { - labels, promLabels := g.labelsFor(vals) + for _, s := range g.series { + labels, promLabels := g.labelsFor(s.vals) ss[promLabels] = TimeSeries{ Labels: labels, - Values: agg.Samples(), + Values: s.agg.Samples(), } } @@ -680,7 +715,7 @@ func lookup(needles []Attribute, haystack Span) Static { } } - return Static{} + return NewStaticNil() } type MetricsEvalulator struct { @@ -999,7 +1034,7 @@ func (h *HistogramAggregator) Combine(in []*tempopb.TimeSeries) { h.ss[withoutBucketStr] = existing } - b := bucket.asFloat() + b := bucket.Float() for _, sample := range ts.Samples { if sample.Value == 0 { diff --git a/pkg/traceql/engine_metrics_compare.go b/pkg/traceql/engine_metrics_compare.go index 554591025e3..0c4162aa993 100644 --- a/pkg/traceql/engine_metrics_compare.go +++ b/pkg/traceql/engine_metrics_compare.go @@ -33,13 +33,18 @@ type MetricsCompare struct { len int start, end int topN int - baselines map[Attribute]map[Static][]float64 - selections map[Attribute]map[Static][]float64 + baselines map[Attribute]map[StaticMapKey]staticWithCounts + selections map[Attribute]map[StaticMapKey]staticWithCounts baselineTotals map[Attribute][]float64 selectionTotals map[Attribute][]float64 seriesAgg SeriesAggregator } +type staticWithCounts struct { + val Static + counts []float64 +} + func newMetricsCompare(f *SpansetFilter, topN, start, end int) *MetricsCompare { return &MetricsCompare{ f: f, @@ -65,8 +70,8 @@ func (m *MetricsCompare) init(q *tempopb.QueryRangeRequest, mode AggregateMode) m.qend = q.End m.qstep = q.Step m.len = IntervalCount(q.Start, q.End, q.Step) - m.baselines = make(map[Attribute]map[Static][]float64) - m.selections = make(map[Attribute]map[Static][]float64) + m.baselines = make(map[Attribute]map[StaticMapKey]staticWithCounts) + m.selections = make(map[Attribute]map[StaticMapKey]staticWithCounts) m.baselineTotals = make(map[Attribute][]float64) m.selectionTotals = make(map[Attribute][]float64) @@ -106,7 +111,7 @@ func (m *MetricsCompare) observe(span Span) { // Choose destination buffers dest := m.baselines destTotals := m.baselineTotals - if isSelection == StaticTrue { + if isSelection.Equals(&StaticTrue) { dest = m.selections destTotals = m.selectionTotals } @@ -130,16 +135,17 @@ func (m *MetricsCompare) observe(span Span) { values, ok := dest[a] if !ok { - values = make(map[Static][]float64, m.len) + values = make(map[StaticMapKey]staticWithCounts, m.len) dest[a] = values } - counts, ok := values[v] + vk := v.MapKey() + sc, ok := values[vk] if !ok { - counts = make([]float64, m.len) - values[v] = counts + sc = staticWithCounts{val: v, counts: make([]float64, m.len)} + values[vk] = sc } - counts[i]++ + sc.counts[i]++ // TODO - It's probably faster to aggregate these at the end // instead of incrementing in the hotpath twice @@ -175,19 +181,19 @@ func (m *MetricsCompare) result() SeriesSet { } } - addValues := func(prefix Label, data map[Attribute]map[Static][]float64) { + addValues := func(prefix Label, data map[Attribute]map[StaticMapKey]staticWithCounts) { for a, values := range data { // Compute topN values for this attribute top.reset() - for v, counts := range values { - top.add(v, counts) + for _, sc := range values { + top.add(sc.val, sc.counts) } top.get(m.topN, func(v Static) { add(Labels{ prefix, {Name: a.String(), Value: v}, - }, values[v]) + }, values[v.MapKey()].counts) }) if len(values) > m.topN { @@ -259,19 +265,24 @@ type BaselineAggregator struct { topN int len int start, end, step uint64 - baseline map[string]map[Static]TimeSeries - selection map[string]map[Static]TimeSeries - baselineTotals map[string]map[Static]TimeSeries - selectionTotals map[string]map[Static]TimeSeries + baseline map[string]map[StaticMapKey]staticWithTimeSeries + selection map[string]map[StaticMapKey]staticWithTimeSeries + baselineTotals map[string]map[StaticMapKey]staticWithTimeSeries + selectionTotals map[string]map[StaticMapKey]staticWithTimeSeries maxed map[string]struct{} } +type staticWithTimeSeries struct { + val Static + series TimeSeries +} + func NewBaselineAggregator(req *tempopb.QueryRangeRequest, topN int) *BaselineAggregator { return &BaselineAggregator{ - baseline: make(map[string]map[Static]TimeSeries), - selection: make(map[string]map[Static]TimeSeries), - baselineTotals: make(map[string]map[Static]TimeSeries), - selectionTotals: make(map[string]map[Static]TimeSeries), + baseline: make(map[string]map[StaticMapKey]staticWithTimeSeries), + selection: make(map[string]map[StaticMapKey]staticWithTimeSeries), + baselineTotals: make(map[string]map[StaticMapKey]staticWithTimeSeries), + selectionTotals: make(map[string]map[StaticMapKey]staticWithTimeSeries), maxed: make(map[string]struct{}), len: IntervalCount(req.Start, req.End, req.Step), start: req.Start, @@ -314,7 +325,7 @@ func (b *BaselineAggregator) Combine(ss []*tempopb.TimeSeries) { // Merge this time series into the destination buffer // based on meta type - var dest map[string]map[Static]TimeSeries + var dest map[string]map[StaticMapKey]staticWithTimeSeries switch metaType { case internalMetaTypeBaseline: dest = b.baseline @@ -331,16 +342,15 @@ func (b *BaselineAggregator) Combine(ss []*tempopb.TimeSeries) { attr, ok := dest[a] if !ok { - attr = make(map[Static]TimeSeries) + attr = make(map[StaticMapKey]staticWithTimeSeries) dest[a] = attr } - val, ok := attr[v] + vk := v.MapKey() + ts, ok := attr[vk] if !ok { - val = TimeSeries{ - Values: make([]float64, b.len), - } - attr[v] = val + ts = staticWithTimeSeries{val: v, series: TimeSeries{Values: make([]float64, b.len)}} + attr[vk] = ts } if len(attr) > b.topN { @@ -351,8 +361,8 @@ func (b *BaselineAggregator) Combine(ss []*tempopb.TimeSeries) { for _, sample := range s.Samples { j := IntervalOfMs(sample.TimestampMs, b.start, b.end, b.step) - if j >= 0 && j < len(val.Values) { - val.Values[j] += sample.Value + if j >= 0 && j < len(ts.series.Values) { + ts.series.Values[j] += sample.Value } } } @@ -373,16 +383,17 @@ func (b *BaselineAggregator) Results() SeriesSet { } } - do := func(buffer map[string]map[Static]TimeSeries, prefix Label) { + do := func(buffer map[string]map[StaticMapKey]staticWithTimeSeries, prefix Label) { for a, m := range buffer { topN.reset() - for v, ts := range m { - topN.add(v, ts.Values) + for _, ts := range m { + topN.add(ts.val, ts.series.Values) } topN.get(b.topN, func(key Static) { - addSeries(prefix, a, key, m[key].Values) + ts := m[key.MapKey()] + addSeries(prefix, a, key, ts.series.Values) }) } } diff --git a/pkg/traceql/enum_hints.go b/pkg/traceql/enum_hints.go index 4f08c004965..9d478258785 100644 --- a/pkg/traceql/enum_hints.go +++ b/pkg/traceql/enum_hints.go @@ -43,12 +43,13 @@ func newHints(h []*Hint) *Hints { func (h *Hints) GetFloat(k string, allowUnsafe bool) (v float64, ok bool) { if v, ok := h.Get(k, TypeFloat, allowUnsafe); ok { - return v.F, ok + return v.Float(), ok } // If float not found, then try integer. if v, ok := h.Get(k, TypeInt, allowUnsafe); ok { - return float64(v.N), ok + n, _ := v.Int() + return float64(n), ok } return @@ -56,7 +57,8 @@ func (h *Hints) GetFloat(k string, allowUnsafe bool) (v float64, ok bool) { func (h *Hints) GetInt(k string, allowUnsafe bool) (v int, ok bool) { if v, ok := h.Get(k, TypeInt, allowUnsafe); ok { - return v.N, ok + n, _ := v.Int() + return n, ok } return @@ -64,7 +66,8 @@ func (h *Hints) GetInt(k string, allowUnsafe bool) (v int, ok bool) { func (h *Hints) GetDuration(k string, allowUnsafe bool) (v time.Duration, ok bool) { if v, ok := h.Get(k, TypeDuration, allowUnsafe); ok { - return v.D, ok + d, _ := v.Duration() + return d, ok } return @@ -72,7 +75,8 @@ func (h *Hints) GetDuration(k string, allowUnsafe bool) (v time.Duration, ok boo func (h *Hints) GetBool(k string, allowUnsafe bool) (v, ok bool) { if v, ok := h.Get(k, TypeBoolean, allowUnsafe); ok { - return v.B, ok + b, _ := v.Bool() + return b, ok } return diff --git a/pkg/traceql/enum_statics.go b/pkg/traceql/enum_statics.go index c4b5be4997f..df5c93e794b 100644 --- a/pkg/traceql/enum_statics.go +++ b/pkg/traceql/enum_statics.go @@ -12,6 +12,10 @@ const ( TypeFloat TypeString TypeBoolean + TypeIntArray + TypeFloatArray + TypeStringArray + TypeBooleanArray TypeDuration TypeStatus TypeKind diff --git a/pkg/traceql/util.go b/pkg/traceql/util.go index 745ac9b14df..297eb05a969 100644 --- a/pkg/traceql/util.go +++ b/pkg/traceql/util.go @@ -9,7 +9,7 @@ func MakeCollectTagValueFunc(collect func(tempopb.TagValue) bool) func(v Static) switch v.Type { case TypeString: tv.Type = "string" - tv.Value = v.S // avoid formatting + tv.Value = v.EncodeToString(false) // avoid formatting case TypeBoolean: tv.Type = "bool" diff --git a/pkg/traceqlmetrics/metrics.go b/pkg/traceqlmetrics/metrics.go index d5cd16122fa..3798f6a078a 100644 --- a/pkg/traceqlmetrics/metrics.go +++ b/pkg/traceqlmetrics/metrics.go @@ -108,30 +108,53 @@ type KeyValue struct { type MetricSeries [maxGroupBys]KeyValue +func (ms *MetricSeries) MetricKeys() MetricKeys { + var keys MetricKeys + for i, kv := range ms { + keys[i] = MetricKey{Key: kv.Key, StaticKey: kv.Value.MapKey()} + } + return keys +} + +type MetricKey struct { + Key string + StaticKey traceql.StaticMapKey +} + +type MetricKeys [maxGroupBys]MetricKey + type MetricsResults struct { Estimated bool SpanCount int - Series map[MetricSeries]*LatencyHistogram - Errors map[MetricSeries]int + Series map[MetricKeys]*SeriesHistogram + Errors map[MetricKeys]int +} + +type SeriesHistogram struct { + Series MetricSeries + Histogram LatencyHistogram } func NewMetricsResults() *MetricsResults { return &MetricsResults{ - Series: map[MetricSeries]*LatencyHistogram{}, - Errors: map[MetricSeries]int{}, + Series: map[MetricKeys]*SeriesHistogram{}, + Errors: map[MetricKeys]int{}, } } func (m *MetricsResults) Record(series MetricSeries, durationNanos uint64, err bool) { - s := m.Series[series] - if s == nil { - s = &LatencyHistogram{} - m.Series[series] = s + keys := series.MetricKeys() + + sh := m.Series[keys] + if sh == nil { + sh = &SeriesHistogram{Series: series} + m.Series[keys] = sh } - s.Record(durationNanos) + + sh.Histogram.Record(durationNanos) if err { - m.Errors[series]++ + m.Errors[keys]++ } } @@ -142,12 +165,12 @@ func (m *MetricsResults) Combine(other *MetricsResults) { } for k, v := range other.Series { - s := m.Series[k] - if s == nil { - s = &LatencyHistogram{} - m.Series[k] = s + sh := m.Series[k] + if sh == nil { + sh = &SeriesHistogram{Series: v.Series} + m.Series[k] = sh } - s.Combine(*v) + sh.Histogram.Combine(v.Histogram) } for k, v := range other.Errors { @@ -283,11 +306,12 @@ func GetMetrics(ctx context.Context, query, groupBy string, spanLimit int, start var ( series = MetricSeries{} status, _ = s.AttributeFor(status) - err = status == statusErr + err = status.Equals(&statusErr) ) for i, g := range groupBys { - series[i] = KeyValue{Key: groupByKeys[i], Value: lookup(g, s)} + static := lookup(g, s) + series[i] = KeyValue{Key: groupByKeys[i], Value: static} } results.Record(series, s.DurationNanos(), err) @@ -314,5 +338,5 @@ func lookup(needles []traceql.Attribute, span traceql.Span) traceql.Static { } } - return traceql.Static{} + return traceql.NewStaticNil() } diff --git a/pkg/traceqlmetrics/metrics_test.go b/pkg/traceqlmetrics/metrics_test.go index b3d219fc498..595529e319c 100644 --- a/pkg/traceqlmetrics/metrics_test.go +++ b/pkg/traceqlmetrics/metrics_test.go @@ -66,8 +66,13 @@ func TestPercentile(t *testing.T) { func TestMetricsResultsCombine(t *testing.T) { a := MetricSeries{KeyValue{Key: "x", Value: traceql.NewStaticString("1")}} + ak := a.MetricKeys() + b := MetricSeries{KeyValue{Key: "x", Value: traceql.NewStaticString("2")}} + bk := b.MetricKeys() + c := MetricSeries{KeyValue{Key: "x", Value: traceql.NewStaticString("3")}} + ck := c.MetricKeys() m := NewMetricsResults() m.Record(a, 1, true) @@ -86,13 +91,13 @@ func TestMetricsResultsCombine(t *testing.T) { require.Equal(t, 3, len(m.Series)) require.Equal(t, 3, len(m.Errors)) - require.Equal(t, 1, m.Series[a].Count()) - require.Equal(t, 4, m.Series[b].Count()) - require.Equal(t, 3, m.Series[c].Count()) + require.Equal(t, 1, m.Series[ak].Histogram.Count()) + require.Equal(t, 4, m.Series[bk].Histogram.Count()) + require.Equal(t, 3, m.Series[ck].Histogram.Count()) - require.Equal(t, 1, m.Errors[a]) - require.Equal(t, 2, m.Errors[b]) - require.Equal(t, 1, m.Errors[c]) + require.Equal(t, 1, m.Errors[ak]) + require.Equal(t, 2, m.Errors[bk]) + require.Equal(t, 1, m.Errors[ck]) } func TestGetMetrics(t *testing.T) { @@ -126,21 +131,24 @@ func TestGetMetrics(t *testing.T) { require.NotNil(t, res) one := MetricSeries{KeyValue{Key: "span.foo", Value: traceql.NewStaticString("1")}} + oneK := one.MetricKeys() + two := MetricSeries{KeyValue{Key: "span.foo", Value: traceql.NewStaticString("2")}} + twoK := two.MetricKeys() - require.Equal(t, 0, res.Errors[one]) - require.Equal(t, 1, res.Errors[two]) + require.Equal(t, 0, res.Errors[oneK]) + require.Equal(t, 1, res.Errors[twoK]) - require.NotNil(t, res.Series[one]) - require.NotNil(t, res.Series[two]) + require.NotNil(t, res.Series[oneK]) + require.NotNil(t, res.Series[twoK]) - require.Equal(t, uint64(128), res.Series[one].Percentile(0.5)) // p50 - require.Equal(t, uint64(181), res.Series[one].Percentile(0.75)) // p75, 128 * 2^0.5 = 181 - require.Equal(t, uint64(256), res.Series[one].Percentile(1.0)) // p100 + require.Equal(t, uint64(128), res.Series[oneK].Histogram.Percentile(0.5)) // p50 + require.Equal(t, uint64(181), res.Series[oneK].Histogram.Percentile(0.75)) // p75, 128 * 2^0.5 = 181 + require.Equal(t, uint64(256), res.Series[oneK].Histogram.Percentile(1.0)) // p100 - require.Equal(t, uint64(256), res.Series[two].Percentile(0.5)) // p50 - require.Equal(t, uint64(362), res.Series[two].Percentile(0.75)) // p75, 256 * 2^0.5 = 362 - require.Equal(t, uint64(512), res.Series[two].Percentile(1.0)) // p100 + require.Equal(t, uint64(256), res.Series[twoK].Histogram.Percentile(0.5)) // p50 + require.Equal(t, uint64(362), res.Series[twoK].Histogram.Percentile(0.75)) // p75, 256 * 2^0.5 = 362 + require.Equal(t, uint64(512), res.Series[twoK].Histogram.Percentile(1.0)) // p100 } func TestGetMetricsTimeRange(t *testing.T) { @@ -170,8 +178,11 @@ func TestGetMetricsTimeRange(t *testing.T) { require.NotNil(t, res) one := MetricSeries{KeyValue{Key: "span.foo", Value: traceql.NewStaticString("1")}} + oneK := one.MetricKeys() + two := MetricSeries{KeyValue{Key: "span.foo", Value: traceql.NewStaticString("2")}} + twoK := two.MetricKeys() - require.Equal(t, uint64(128), res.Series[one].Percentile(1.0)) // Highest span - require.Equal(t, uint64(512), res.Series[two].Percentile(1.0)) // Highest span + require.Equal(t, uint64(128), res.Series[oneK].Histogram.Percentile(1.0)) // Highest span + require.Equal(t, uint64(512), res.Series[twoK].Histogram.Percentile(1.0)) // Highest span } diff --git a/tempodb/encoding/vparquet2/block_traceql.go b/tempodb/encoding/vparquet2/block_traceql.go index e042f384efd..626beefd239 100644 --- a/tempodb/encoding/vparquet2/block_traceql.go +++ b/tempodb/encoding/vparquet2/block_traceql.go @@ -613,7 +613,7 @@ func putSpanset(ss *traceql.Spanset) { ss.DurationNanos = 0 ss.RootServiceName = "" ss.RootSpanName = "" - ss.Scalar = traceql.Static{} + ss.Scalar = traceql.NewStaticNil() ss.StartTimeUnixNanos = 0 ss.TraceID = nil ss.Spans = ss.Spans[:0] @@ -1652,7 +1652,7 @@ func createPredicate(op traceql.Operator, operands traceql.Operands) (parquetque case traceql.TypeBoolean: return createBoolPredicate(op, operands) default: - return nil, fmt.Errorf("cannot create predicate for operand: %v", operands[0]) + return nil, fmt.Errorf("cannot create predicate for operand: %s", operands[0].EncodeToString(false)) } } @@ -1661,14 +1661,11 @@ func createStringPredicate(op traceql.Operator, operands traceql.Operands) (parq return nil, nil } - for _, op := range operands { - if op.Type != traceql.TypeString { - return nil, fmt.Errorf("operand is not string: %+v", op) - } + s := operands[0].EncodeToString(false) + if operands[0].Type != traceql.TypeString { + return nil, fmt.Errorf("operand is not string: %s", s) } - s := operands[0].S - switch op { case traceql.OpEqual: return parquetquery.NewStringEqualPredicate([]byte(s)), nil @@ -1687,7 +1684,7 @@ func createStringPredicate(op traceql.Operator, operands traceql.Operands) (parq case traceql.OpLessEqual: return parquetquery.NewStringLessEqualPredicate([]byte(s)), nil default: - return nil, fmt.Errorf("operand not supported for strings: %+v", op) + return nil, fmt.Errorf("operator not supported for strings: %+v", op) } } @@ -1696,14 +1693,11 @@ func createBytesPredicate(op traceql.Operator, operands traceql.Operands, isSpan return nil, nil } - for _, op := range operands { - if op.Type != traceql.TypeString { - return nil, fmt.Errorf("operand is not string: %+v", op) - } + s := operands[0].EncodeToString(false) + if operands[0].Type != traceql.TypeString { + return nil, fmt.Errorf("operand is not string: %s", s) } - s := operands[0].S - var id []byte id, err := util.HexStringToTraceID(s) if isSpan { @@ -1722,7 +1716,7 @@ func createBytesPredicate(op traceql.Operator, operands traceql.Operands, isSpan case traceql.OpNotEqual: return parquetquery.NewStringNotEqualPredicate(id), nil default: - return nil, fmt.Errorf("operand not supported for IDs: %+v", op) + return nil, fmt.Errorf("operator not supported for IDs: %+v", op) } } @@ -1734,15 +1728,19 @@ func createIntPredicate(op traceql.Operator, operands traceql.Operands) (parquet var i int64 switch operands[0].Type { case traceql.TypeInt: - i = int64(operands[0].N) + n, _ := operands[0].Int() + i = int64(n) case traceql.TypeDuration: - i = operands[0].D.Nanoseconds() + d, _ := operands[0].Duration() + i = d.Nanoseconds() case traceql.TypeStatus: - i = int64(StatusCodeMapping[operands[0].Status.String()]) + st, _ := operands[0].Status() + i = int64(StatusCodeMapping[st.String()]) case traceql.TypeKind: - i = int64(KindMapping[operands[0].Kind.String()]) + k, _ := operands[0].Kind() + i = int64(KindMapping[k.String()]) default: - return nil, fmt.Errorf("operand is not int, duration, status or kind: %+v", operands[0]) + return nil, fmt.Errorf("operand is not int, duration, status or kind: %s", operands[0].EncodeToString(false)) } switch op { @@ -1759,7 +1757,7 @@ func createIntPredicate(op traceql.Operator, operands traceql.Operands) (parquet case traceql.OpLessEqual: return parquetquery.NewIntLessEqualPredicate(i), nil default: - return nil, fmt.Errorf("operand not supported for integers: %+v", op) + return nil, fmt.Errorf("operator not supported for integers: %+v", op) } } @@ -1770,26 +1768,26 @@ func createFloatPredicate(op traceql.Operator, operands traceql.Operands) (parqu // Ensure operand is float if operands[0].Type != traceql.TypeFloat { - return nil, fmt.Errorf("operand is not float: %+v", operands[0]) + return nil, fmt.Errorf("operand is not float: %s", operands[0].EncodeToString(false)) } - i := operands[0].F + f := operands[0].Float() switch op { case traceql.OpEqual: - return parquetquery.NewFloatEqualPredicate(i), nil + return parquetquery.NewFloatEqualPredicate(f), nil case traceql.OpNotEqual: - return parquetquery.NewFloatNotEqualPredicate(i), nil + return parquetquery.NewFloatNotEqualPredicate(f), nil case traceql.OpGreater: - return parquetquery.NewFloatGreaterPredicate(i), nil + return parquetquery.NewFloatGreaterPredicate(f), nil case traceql.OpGreaterEqual: - return parquetquery.NewFloatGreaterEqualPredicate(i), nil + return parquetquery.NewFloatGreaterEqualPredicate(f), nil case traceql.OpLess: - return parquetquery.NewFloatLessPredicate(i), nil + return parquetquery.NewFloatLessPredicate(f), nil case traceql.OpLessEqual: - return parquetquery.NewFloatLessEqualPredicate(i), nil + return parquetquery.NewFloatLessEqualPredicate(f), nil default: - return nil, fmt.Errorf("operand not supported for floats: %+v", op) + return nil, fmt.Errorf("operator not supported for floats: %+v", op) } } @@ -1799,17 +1797,18 @@ func createBoolPredicate(op traceql.Operator, operands traceql.Operands) (parque } // Ensure operand is bool - if operands[0].Type != traceql.TypeBoolean { - return nil, fmt.Errorf("operand is not bool: %+v", operands[0]) + b, ok := operands[0].Bool() + if !ok { + return nil, fmt.Errorf("oparand is not bool: %+v", operands[0].EncodeToString(false)) } switch op { case traceql.OpEqual: - return parquetquery.NewBoolEqualPredicate(operands[0].B), nil + return parquetquery.NewBoolEqualPredicate(b), nil case traceql.OpNotEqual: - return parquetquery.NewBoolNotEqualPredicate(operands[0].B), nil + return parquetquery.NewBoolNotEqualPredicate(b), nil default: - return nil, fmt.Errorf("operand not supported for booleans: %+v", op) + return nil, fmt.Errorf("operator not supported for booleans: %+v", op) } } diff --git a/tempodb/encoding/vparquet3/block_autocomplete.go b/tempodb/encoding/vparquet3/block_autocomplete.go index a8d08174380..89421a3182f 100644 --- a/tempodb/encoding/vparquet3/block_autocomplete.go +++ b/tempodb/encoding/vparquet3/block_autocomplete.go @@ -743,14 +743,14 @@ type distinctAttrCollector struct { scope traceql.AttributeScope attrNames bool - sentVals map[traceql.Static]struct{} + sentVals map[traceql.StaticMapKey]struct{} sentKeys map[string]struct{} } func newDistinctAttrCollector(scope traceql.AttributeScope, attrNames bool) *distinctAttrCollector { return &distinctAttrCollector{ scope: scope, - sentVals: make(map[traceql.Static]struct{}), + sentVals: make(map[traceql.StaticMapKey]struct{}), sentKeys: make(map[string]struct{}), attrNames: attrNames, } @@ -792,11 +792,11 @@ func (d *distinctAttrCollector) KeepGroup(result *parquetquery.IteratorResult) b } } - var empty traceql.Static - if val != empty { - if _, ok := d.sentVals[val]; !ok { + if val.Type != traceql.TypeNil { + mk := val.MapKey() + if _, ok := d.sentVals[mk]; !ok { result.AppendOtherValue("", val) - d.sentVals[val] = struct{}{} + d.sentVals[mk] = struct{}{} } } @@ -814,14 +814,14 @@ var _ parquetquery.GroupPredicate = (*distinctValueCollector)(nil) type distinctValueCollector struct { mapToStatic func(entry) traceql.Static - sentVals map[traceql.Static]struct{} + sentVals map[traceql.StaticMapKey]struct{} name string } func newDistinctValueCollector(mapToStatic func(entry) traceql.Static, name string) *distinctValueCollector { return &distinctValueCollector{ mapToStatic: mapToStatic, - sentVals: make(map[traceql.Static]struct{}), + sentVals: make(map[traceql.StaticMapKey]struct{}), name: name, } } @@ -835,9 +835,10 @@ func (d distinctValueCollector) KeepGroup(result *parquetquery.IteratorResult) b } static := d.mapToStatic(e) - if _, ok := d.sentVals[static]; !ok { + mk := static.MapKey() + if _, ok := d.sentVals[mk]; !ok { result.AppendOtherValue("", static) - d.sentVals[static] = struct{}{} + d.sentVals[mk] = struct{}{} } } result.Entries = result.Entries[:0] @@ -904,7 +905,7 @@ func mapSpanAttr(e entry) traceql.Static { return traceql.NewStaticString(unsafeToString(e.Value.ByteArray())) } } - return traceql.Static{} + return traceql.NewStaticNil() } func mapResourceAttr(e entry) traceql.Static { @@ -918,7 +919,7 @@ func mapResourceAttr(e entry) traceql.Static { case parquet.ByteArray, parquet.FixedLenByteArray: return traceql.NewStaticString(unsafeToString(e.Value.ByteArray())) default: - return traceql.Static{} + return traceql.NewStaticNil() } } @@ -932,7 +933,7 @@ func mapTraceAttr(e entry) traceql.Static { case columnPathRootServiceName: return traceql.NewStaticString(unsafeToString(e.Value.ByteArray())) } - return traceql.Static{} + return traceql.NewStaticNil() } func scopeFromDefinitionLevel(lvl int) traceql.AttributeScope { diff --git a/tempodb/encoding/vparquet3/block_traceql.go b/tempodb/encoding/vparquet3/block_traceql.go index ea1a837fc7b..f247f18ce50 100644 --- a/tempodb/encoding/vparquet3/block_traceql.go +++ b/tempodb/encoding/vparquet3/block_traceql.go @@ -142,25 +142,25 @@ func (s *span) AttributeFor(a traceql.Attribute) (traceql.Static, bool) { if attr := find(a, s.resourceAttrs); attr != nil { return *attr, true } - return traceql.Static{}, false + return traceql.NewStaticNil(), false } if a.Scope == traceql.AttributeScopeSpan { if attr := find(a, s.spanAttrs); attr != nil { return *attr, true } - return traceql.Static{}, false + return traceql.NewStaticNil(), false } if a.Intrinsic != traceql.IntrinsicNone { if a.Intrinsic == traceql.IntrinsicNestedSetLeft { - return traceql.Static{Type: traceql.TypeInt, N: int(s.nestedSetLeft)}, true + return traceql.NewStaticInt(int(s.nestedSetLeft)), true } if a.Intrinsic == traceql.IntrinsicNestedSetRight { - return traceql.Static{Type: traceql.TypeInt, N: int(s.nestedSetRight)}, true + return traceql.NewStaticInt(int(s.nestedSetRight)), true } if a.Intrinsic == traceql.IntrinsicNestedSetParent { - return traceql.Static{Type: traceql.TypeInt, N: int(s.nestedSetParent)}, true + return traceql.NewStaticInt(int(s.nestedSetParent)), true } // intrinsics are always on the span or trace ... for now @@ -183,7 +183,7 @@ func (s *span) AttributeFor(a traceql.Attribute) (traceql.Static, bool) { return *attr, true } - return traceql.Static{}, false + return traceql.NewStaticNil(), false } func (s *span) ID() []byte { @@ -747,7 +747,7 @@ func putSpanset(ss *traceql.Spanset) { ss.DurationNanos = 0 ss.RootServiceName = "" ss.RootSpanName = "" - ss.Scalar = traceql.Static{} + ss.Scalar = traceql.NewStaticNil() ss.StartTimeUnixNanos = 0 ss.TraceID = nil ss.Spans = ss.Spans[:0] @@ -1974,14 +1974,11 @@ func createStringPredicate(op traceql.Operator, operands traceql.Operands) (parq return nil, nil } - for _, op := range operands { - if op.Type != traceql.TypeString { - return nil, fmt.Errorf("operand is not string: %+v", op) - } + s := operands[0].EncodeToString(false) + if operands[0].Type != traceql.TypeString { + return nil, fmt.Errorf("operand is not string: %s", s) } - s := operands[0].S - switch op { case traceql.OpEqual: return parquetquery.NewStringEqualPredicate([]byte(s)), nil @@ -2000,7 +1997,7 @@ func createStringPredicate(op traceql.Operator, operands traceql.Operands) (parq case traceql.OpLessEqual: return parquetquery.NewStringLessEqualPredicate([]byte(s)), nil default: - return nil, fmt.Errorf("operand not supported for strings: %+v", op) + return nil, fmt.Errorf("operator not supported for strings: %+v", op) } } @@ -2009,14 +2006,11 @@ func createBytesPredicate(op traceql.Operator, operands traceql.Operands, isSpan return nil, nil } - for _, op := range operands { - if op.Type != traceql.TypeString { - return nil, fmt.Errorf("operand is not string: %+v", op) - } + s := operands[0].EncodeToString(false) + if operands[0].Type != traceql.TypeString { + return nil, fmt.Errorf("operand is not string: %s", s) } - s := operands[0].S - var id []byte id, err := util.HexStringToTraceID(s) if isSpan { @@ -2035,7 +2029,7 @@ func createBytesPredicate(op traceql.Operator, operands traceql.Operands, isSpan case traceql.OpNotEqual: return parquetquery.NewByteNotEqualPredicate(id), nil default: - return nil, fmt.Errorf("operand not supported for IDs: %+v", op) + return nil, fmt.Errorf("operator not supported for IDs: %+v", op) } } @@ -2047,15 +2041,19 @@ func createIntPredicate(op traceql.Operator, operands traceql.Operands) (parquet var i int64 switch operands[0].Type { case traceql.TypeInt: - i = int64(operands[0].N) + n, _ := operands[0].Int() + i = int64(n) case traceql.TypeDuration: - i = operands[0].D.Nanoseconds() + d, _ := operands[0].Duration() + i = d.Nanoseconds() case traceql.TypeStatus: - i = int64(StatusCodeMapping[operands[0].Status.String()]) + st, _ := operands[0].Status() + i = int64(StatusCodeMapping[st.String()]) case traceql.TypeKind: - i = int64(KindMapping[operands[0].Kind.String()]) + k, _ := operands[0].Kind() + i = int64(KindMapping[k.String()]) default: - return nil, fmt.Errorf("operand is not int, duration, status or kind: %+v", operands[0]) + return nil, fmt.Errorf("operand is not int, duration, status or kind: %s", operands[0].EncodeToString(false)) } switch op { @@ -2072,7 +2070,7 @@ func createIntPredicate(op traceql.Operator, operands traceql.Operands) (parquet case traceql.OpLessEqual: return parquetquery.NewIntLessEqualPredicate(i), nil default: - return nil, fmt.Errorf("operand not supported for integers: %+v", op) + return nil, fmt.Errorf("operator not supported for integers: %+v", op) } } @@ -2083,26 +2081,26 @@ func createFloatPredicate(op traceql.Operator, operands traceql.Operands) (parqu // Ensure operand is float if operands[0].Type != traceql.TypeFloat { - return nil, fmt.Errorf("operand is not float: %+v", operands[0]) + return nil, fmt.Errorf("operand is not float: %s", operands[0].EncodeToString(false)) } - i := operands[0].F + f := operands[0].Float() switch op { case traceql.OpEqual: - return parquetquery.NewFloatEqualPredicate(i), nil + return parquetquery.NewFloatEqualPredicate(f), nil case traceql.OpNotEqual: - return parquetquery.NewFloatNotEqualPredicate(i), nil + return parquetquery.NewFloatNotEqualPredicate(f), nil case traceql.OpGreater: - return parquetquery.NewFloatGreaterPredicate(i), nil + return parquetquery.NewFloatGreaterPredicate(f), nil case traceql.OpGreaterEqual: - return parquetquery.NewFloatGreaterEqualPredicate(i), nil + return parquetquery.NewFloatGreaterEqualPredicate(f), nil case traceql.OpLess: - return parquetquery.NewFloatLessPredicate(i), nil + return parquetquery.NewFloatLessPredicate(f), nil case traceql.OpLessEqual: - return parquetquery.NewFloatLessEqualPredicate(i), nil + return parquetquery.NewFloatLessEqualPredicate(f), nil default: - return nil, fmt.Errorf("operand not supported for floats: %+v", op) + return nil, fmt.Errorf("operator not supported for floats: %+v", op) } } @@ -2112,17 +2110,18 @@ func createBoolPredicate(op traceql.Operator, operands traceql.Operands) (parque } // Ensure operand is bool - if operands[0].Type != traceql.TypeBoolean { - return nil, fmt.Errorf("operand is not bool: %+v", operands[0]) + b, ok := operands[0].Bool() + if !ok { + return nil, fmt.Errorf("oparand is not bool: %+v", operands[0].EncodeToString(false)) } switch op { case traceql.OpEqual: - return parquetquery.NewBoolEqualPredicate(operands[0].B), nil + return parquetquery.NewBoolEqualPredicate(b), nil case traceql.OpNotEqual: - return parquetquery.NewBoolNotEqualPredicate(operands[0].B), nil + return parquetquery.NewBoolNotEqualPredicate(b), nil default: - return nil, fmt.Errorf("operand not supported for booleans: %+v", op) + return nil, fmt.Errorf("operator not supported for booleans: %+v", op) } } diff --git a/tempodb/encoding/vparquet3/coalesce_conditions.go b/tempodb/encoding/vparquet3/coalesce_conditions.go index c575667f0f9..4990c6cb01a 100644 --- a/tempodb/encoding/vparquet3/coalesce_conditions.go +++ b/tempodb/encoding/vparquet3/coalesce_conditions.go @@ -74,7 +74,7 @@ func operandsEqual(c1 traceql.Condition, c2 traceql.Condition) bool { // todo: sort first? for i := 0; i < len(c1.Operands); i++ { - if c1.Operands[i] != c2.Operands[i] { + if !c1.Operands[i].StrictEquals(&c2.Operands[i]) { return false } } diff --git a/tempodb/encoding/vparquet4/block_autocomplete.go b/tempodb/encoding/vparquet4/block_autocomplete.go index 5464f8477fe..6c470ba73d5 100644 --- a/tempodb/encoding/vparquet4/block_autocomplete.go +++ b/tempodb/encoding/vparquet4/block_autocomplete.go @@ -901,14 +901,14 @@ type distinctAttrCollector struct { scope traceql.AttributeScope attrNames bool - sentVals map[traceql.Static]struct{} + sentVals map[traceql.StaticMapKey]struct{} sentKeys map[string]struct{} } func newDistinctAttrCollector(scope traceql.AttributeScope, attrNames bool) *distinctAttrCollector { return &distinctAttrCollector{ scope: scope, - sentVals: make(map[traceql.Static]struct{}), + sentVals: make(map[traceql.StaticMapKey]struct{}), sentKeys: make(map[string]struct{}), attrNames: attrNames, } @@ -950,11 +950,11 @@ func (d *distinctAttrCollector) KeepGroup(result *parquetquery.IteratorResult) b } } - var empty traceql.Static - if val != empty { - if _, ok := d.sentVals[val]; !ok { + if val.Type != traceql.TypeNil { + mk := val.MapKey() + if _, ok := d.sentVals[mk]; !ok { result.AppendOtherValue("", val) - d.sentVals[val] = struct{}{} + d.sentVals[mk] = struct{}{} } } @@ -972,14 +972,14 @@ var _ parquetquery.GroupPredicate = (*distinctValueCollector)(nil) type distinctValueCollector struct { mapToStatic func(entry) traceql.Static - sentVals map[traceql.Static]struct{} + sentVals map[traceql.StaticMapKey]struct{} name string } func newDistinctValueCollector(mapToStatic func(entry) traceql.Static, name string) *distinctValueCollector { return &distinctValueCollector{ mapToStatic: mapToStatic, - sentVals: make(map[traceql.Static]struct{}), + sentVals: make(map[traceql.StaticMapKey]struct{}), name: name, } } @@ -993,9 +993,10 @@ func (d distinctValueCollector) KeepGroup(result *parquetquery.IteratorResult) b } static := d.mapToStatic(e) - if _, ok := d.sentVals[static]; !ok { + mk := static.MapKey() + if _, ok := d.sentVals[mk]; !ok { result.AppendOtherValue("", static) - d.sentVals[static] = struct{}{} + d.sentVals[mk] = struct{}{} } } result.Entries = result.Entries[:0] @@ -1074,7 +1075,7 @@ func mapSpanAttr(e entry) traceql.Static { return traceql.NewStaticString(unsafeToString(e.Value.ByteArray())) } } - return traceql.Static{} + return traceql.NewStaticNil() } func mapResourceAttr(e entry) traceql.Static { @@ -1088,7 +1089,7 @@ func mapResourceAttr(e entry) traceql.Static { case parquet.ByteArray, parquet.FixedLenByteArray: return traceql.NewStaticString(unsafeToString(e.Value.ByteArray())) default: - return traceql.Static{} + return traceql.NewStaticNil() } } @@ -1102,7 +1103,7 @@ func mapTraceAttr(e entry) traceql.Static { case columnPathRootServiceName: return traceql.NewStaticString(unsafeToString(e.Value.ByteArray())) } - return traceql.Static{} + return traceql.NewStaticNil() } func scopeFromDefinitionLevel(lvl int) traceql.AttributeScope { diff --git a/tempodb/encoding/vparquet4/block_traceql.go b/tempodb/encoding/vparquet4/block_traceql.go index f2587930a4c..352fb9009d7 100644 --- a/tempodb/encoding/vparquet4/block_traceql.go +++ b/tempodb/encoding/vparquet4/block_traceql.go @@ -158,38 +158,38 @@ func (s *span) AttributeFor(a traceql.Attribute) (traceql.Static, bool) { if attr := find(a, s.resourceAttrs); attr != nil { return *attr, true } - return traceql.Static{}, false + return traceql.NewStaticNil(), false } if a.Scope == traceql.AttributeScopeSpan { if attr := find(a, s.spanAttrs); attr != nil { return *attr, true } - return traceql.Static{}, false + return traceql.NewStaticNil(), false } if a.Scope == traceql.AttributeScopeEvent { if attr := find(a, s.eventAttrs); attr != nil { return *attr, true } - return traceql.Static{}, false + return traceql.NewStaticNil(), false } if a.Scope == traceql.AttributeScopeLink { if attr := find(a, s.linkAttrs); attr != nil { return *attr, true } - return traceql.Static{}, false + return traceql.NewStaticNil(), false } if a.Intrinsic != traceql.IntrinsicNone { if a.Intrinsic == traceql.IntrinsicNestedSetLeft { - return traceql.Static{Type: traceql.TypeInt, N: int(s.nestedSetLeft)}, true + return traceql.NewStaticInt(int(s.nestedSetLeft)), true } if a.Intrinsic == traceql.IntrinsicNestedSetRight { - return traceql.Static{Type: traceql.TypeInt, N: int(s.nestedSetRight)}, true + return traceql.NewStaticInt(int(s.nestedSetRight)), true } if a.Intrinsic == traceql.IntrinsicNestedSetParent { - return traceql.Static{Type: traceql.TypeInt, N: int(s.nestedSetParent)}, true + return traceql.NewStaticInt(int(s.nestedSetParent)), true } // intrinsics are always on the span, trace, event, or link ... for now @@ -228,7 +228,7 @@ func (s *span) AttributeFor(a traceql.Attribute) (traceql.Static, bool) { return *attr, true } - return traceql.Static{}, false + return traceql.NewStaticNil(), false } func (s *span) ID() []byte { @@ -812,7 +812,7 @@ func putSpanset(ss *traceql.Spanset) { ss.DurationNanos = 0 ss.RootServiceName = "" ss.RootSpanName = "" - ss.Scalar = traceql.Static{} + ss.Scalar = traceql.NewStaticNil() ss.StartTimeUnixNanos = 0 ss.TraceID = nil clear(ss.ServiceStats) @@ -2268,14 +2268,11 @@ func createStringPredicate(op traceql.Operator, operands traceql.Operands) (parq return nil, nil } - for _, op := range operands { - if op.Type != traceql.TypeString { - return nil, fmt.Errorf("operand is not string: %+v", op) - } + s := operands[0].EncodeToString(false) + if operands[0].Type != traceql.TypeString { + return nil, fmt.Errorf("operand is not string: %s", s) } - s := operands[0].S - switch op { case traceql.OpEqual: return parquetquery.NewStringEqualPredicate([]byte(s)), nil @@ -2294,7 +2291,7 @@ func createStringPredicate(op traceql.Operator, operands traceql.Operands) (parq case traceql.OpLessEqual: return parquetquery.NewStringLessEqualPredicate([]byte(s)), nil default: - return nil, fmt.Errorf("operand not supported for strings: %+v", op) + return nil, fmt.Errorf("operator not supported for strings: %+v", op) } } @@ -2303,14 +2300,11 @@ func createBytesPredicate(op traceql.Operator, operands traceql.Operands, isSpan return nil, nil } - for _, op := range operands { - if op.Type != traceql.TypeString { - return nil, fmt.Errorf("operand is not string: %+v", op) - } + s := operands[0].EncodeToString(false) + if operands[0].Type != traceql.TypeString { + return nil, fmt.Errorf("operand is not string: %s", s) } - s := operands[0].S - var id []byte id, err := util.HexStringToTraceID(s) if isSpan { @@ -2329,7 +2323,7 @@ func createBytesPredicate(op traceql.Operator, operands traceql.Operands, isSpan case traceql.OpNotEqual: return parquetquery.NewByteNotEqualPredicate(id), nil default: - return nil, fmt.Errorf("operand not supported for IDs: %+v", op) + return nil, fmt.Errorf("operator not supported for IDs: %+v", op) } } @@ -2341,15 +2335,19 @@ func createIntPredicate(op traceql.Operator, operands traceql.Operands) (parquet var i int64 switch operands[0].Type { case traceql.TypeInt: - i = int64(operands[0].N) + n, _ := operands[0].Int() + i = int64(n) case traceql.TypeDuration: - i = operands[0].D.Nanoseconds() + d, _ := operands[0].Duration() + i = d.Nanoseconds() case traceql.TypeStatus: - i = int64(StatusCodeMapping[operands[0].Status.String()]) + st, _ := operands[0].Status() + i = int64(StatusCodeMapping[st.String()]) case traceql.TypeKind: - i = int64(KindMapping[operands[0].Kind.String()]) + k, _ := operands[0].Kind() + i = int64(KindMapping[k.String()]) default: - return nil, fmt.Errorf("operand is not int, duration, status or kind: %+v", operands[0]) + return nil, fmt.Errorf("operand is not int, duration, status or kind: %s", operands[0].EncodeToString(false)) } switch op { @@ -2366,7 +2364,7 @@ func createIntPredicate(op traceql.Operator, operands traceql.Operands) (parquet case traceql.OpLessEqual: return parquetquery.NewIntLessEqualPredicate(i), nil default: - return nil, fmt.Errorf("operand not supported for integers: %+v", op) + return nil, fmt.Errorf("operator not supported for integers: %+v", op) } } @@ -2377,26 +2375,26 @@ func createFloatPredicate(op traceql.Operator, operands traceql.Operands) (parqu // Ensure operand is float if operands[0].Type != traceql.TypeFloat { - return nil, fmt.Errorf("operand is not float: %+v", operands[0]) + return nil, fmt.Errorf("operand is not float: %s", operands[0].EncodeToString(false)) } - i := operands[0].F + f := operands[0].Float() switch op { case traceql.OpEqual: - return parquetquery.NewFloatEqualPredicate(i), nil + return parquetquery.NewFloatEqualPredicate(f), nil case traceql.OpNotEqual: - return parquetquery.NewFloatNotEqualPredicate(i), nil + return parquetquery.NewFloatNotEqualPredicate(f), nil case traceql.OpGreater: - return parquetquery.NewFloatGreaterPredicate(i), nil + return parquetquery.NewFloatGreaterPredicate(f), nil case traceql.OpGreaterEqual: - return parquetquery.NewFloatGreaterEqualPredicate(i), nil + return parquetquery.NewFloatGreaterEqualPredicate(f), nil case traceql.OpLess: - return parquetquery.NewFloatLessPredicate(i), nil + return parquetquery.NewFloatLessPredicate(f), nil case traceql.OpLessEqual: - return parquetquery.NewFloatLessEqualPredicate(i), nil + return parquetquery.NewFloatLessEqualPredicate(f), nil default: - return nil, fmt.Errorf("operand not supported for floats: %+v", op) + return nil, fmt.Errorf("operator not supported for floats: %+v", op) } } @@ -2406,17 +2404,18 @@ func createBoolPredicate(op traceql.Operator, operands traceql.Operands) (parque } // Ensure operand is bool - if operands[0].Type != traceql.TypeBoolean { - return nil, fmt.Errorf("operand is not bool: %+v", operands[0]) + b, ok := operands[0].Bool() + if !ok { + return nil, fmt.Errorf("oparand is not bool: %+v", operands[0].EncodeToString(false)) } switch op { case traceql.OpEqual: - return parquetquery.NewBoolEqualPredicate(operands[0].B), nil + return parquetquery.NewBoolEqualPredicate(b), nil case traceql.OpNotEqual: - return parquetquery.NewBoolNotEqualPredicate(operands[0].B), nil + return parquetquery.NewBoolNotEqualPredicate(b), nil default: - return nil, fmt.Errorf("operand not supported for booleans: %+v", op) + return nil, fmt.Errorf("operator not supported for booleans: %+v", op) } } diff --git a/tempodb/encoding/vparquet4/coalesce_conditions.go b/tempodb/encoding/vparquet4/coalesce_conditions.go index 8497f3bb329..143a2e292fc 100644 --- a/tempodb/encoding/vparquet4/coalesce_conditions.go +++ b/tempodb/encoding/vparquet4/coalesce_conditions.go @@ -74,7 +74,7 @@ func operandsEqual(c1, c2 traceql.Condition) bool { // todo: sort first? for i := 0; i < len(c1.Operands); i++ { - if c1.Operands[i] != c2.Operands[i] { + if !c1.Operands[i].StrictEquals(&c2.Operands[i]) { return false } } diff --git a/tempodb/tempodb_search_test.go b/tempodb/tempodb_search_test.go index fd1ad693be5..5ae4178a360 100644 --- a/tempodb/tempodb_search_test.go +++ b/tempodb/tempodb_search_test.go @@ -2218,14 +2218,17 @@ func TestWALBlockGetMetrics(t *testing.T) { require.NoError(t, err) one := traceqlmetrics.MetricSeries{traceqlmetrics.KeyValue{Key: "name", Value: traceql.NewStaticString("1")}} + oneK := one.MetricKeys() + two := traceqlmetrics.MetricSeries{traceqlmetrics.KeyValue{Key: "name", Value: traceql.NewStaticString("2")}} + twoK := two.MetricKeys() require.Equal(t, 2, len(res.Series)) require.Equal(t, 2, res.SpanCount) - require.Equal(t, 1, res.Series[one].Count()) - require.Equal(t, 1, res.Series[two].Count()) - require.Equal(t, uint64(1), res.Series[one].Percentile(1.0)) // The only span was 1ns - require.Equal(t, uint64(2), res.Series[two].Percentile(1.0)) // The only span was 2ns + require.Equal(t, 1, res.Series[oneK].Histogram.Count()) + require.Equal(t, 1, res.Series[twoK].Histogram.Count()) + require.Equal(t, uint64(1), res.Series[oneK].Histogram.Percentile(1.0)) // The only span was 1ns + require.Equal(t, uint64(2), res.Series[twoK].Histogram.Percentile(1.0)) // The only span was 2ns } func TestSearchForTagsAndTagValues(t *testing.T) {