Skip to content

Commit

Permalink
Refactoring: implement arrays for traceql.Static with reused fields (#…
Browse files Browse the repository at this point in the history
…3827)

* Refactore traceql.Static implementation with reused struct fields

The new implementation is meant to support static values representing
[]int, []float, []bool, and []string in the future without increasing
the struct size

* Fix common errors with new traceql.Static in traceql pkg

* Use traceql.StaticMapKey in maps instead of traceql.Static

* Fix bug with .foo!=nil

* Static value access methods return bolean ok value, not error

* Use traceql.StaticMapKey as map key in traceqlmetrics

* Adjust use of traceql.Static in vp2

* Adjust use of traceql.Static in vp3

* Add method Static.StrictEquals(o)

* Adjust use of traceql.Static in vp4

* Use traceql.StaticMapKey as map key in tempodb

* Fix linter warnings

* Add test for Static.MapKey()

* Add support for string arrays to traceql.Static

* Add support for float and bool arrays to traceql.Static

* Create static nil values by calling traceql.NewStaticNil()

* Remove TODOs and panics

* Replace more Static{} with traceql.NewStaticNil()

* Improve string representation for arrays

* CHANGELOG.md

* Remove redundant type check

* Use fnv1a instead of crc32 to calculate StaticMapKey hash

---------

Co-authored-by: Mario <[email protected]>
  • Loading branch information
stoewer and mapno authored Jul 16, 2024
1 parent 4082749 commit 6a6a7d1
Show file tree
Hide file tree
Showing 25 changed files with 1,531 additions and 625 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* [FEATURE] Flush and query RF1 blocks for TraceQL metric queries [#3628](https://github.com/grafana/tempo/pull/3628) [#3691](https://github.com/grafana/tempo/pull/3691) [#3723](https://github.com/grafana/tempo/pull/3723) (@mapno)
* [FEATURE] Add new compare() metrics function [#3695](https://github.com/grafana/tempo/pull/3695) (@mdisibio)
* [FEATURE] Add a `q` parameter to `/api/v2/serach/tags` for tag name filtering [#3822](https://github.com/grafana/tempo/pull/3822) (@joe-elliott)
* [ENHANCEMENT] Implement arrays for traceql.Static with reused fields [#3827](https://github.com/grafana/tempo/pull/3827) (@stoewer)
* [ENHANCEMENT] Tag value lookup use protobuf internally for improved latency [#3731](https://github.com/grafana/tempo/pull/3731) (@mdisibio)
* [ENHANCEMENT] TraceQL metrics queries use protobuf internally for improved latency [#3745](https://github.com/grafana/tempo/pull/3745) (@mdisibio)
* [ENHANCEMENT] Add local disk caching of metrics queries in local-blocks processor [#3799](https://github.com/grafana/tempo/pull/3799) (@mdisibio)
Expand Down
53 changes: 35 additions & 18 deletions modules/generator/processor/localblocks/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,10 +466,10 @@ func (p *Processor) GetMetrics(ctx context.Context, req *tempopb.SpanMetricsRequ

var rawHistorgram *tempopb.RawHistogram
var errCount int
for series, hist := range m.Series {
for keys, sh := range m.Series {
h := []*tempopb.RawHistogram{}

for bucket, count := range hist.Buckets() {
for bucket, count := range sh.Histogram.Buckets() {
if count != 0 {
rawHistorgram = &tempopb.RawHistogram{
Bucket: uint64(bucket),
Expand All @@ -481,13 +481,13 @@ func (p *Processor) GetMetrics(ctx context.Context, req *tempopb.SpanMetricsRequ
}

errCount = 0
if errs, ok := m.Errors[series]; ok {
if errs, ok := m.Errors[keys]; ok {
errCount = errs
}

resp.Metrics = append(resp.Metrics, &tempopb.SpanMetrics{
LatencyHistogram: h,
Series: metricSeriesToProto(series),
Series: metricSeriesToProto(sh.Series),
Errors: uint64(errCount),
})
}
Expand Down Expand Up @@ -802,25 +802,42 @@ func metricSeriesToProto(series traceqlmetrics.MetricSeries) []*tempopb.KeyValue
var r []*tempopb.KeyValue
for _, kv := range series {
if kv.Key != "" {
static := kv.Value
r = append(r, &tempopb.KeyValue{
Key: kv.Key,
Value: &tempopb.TraceQLStatic{
Type: int32(static.Type),
N: int64(static.N),
F: static.F,
S: static.S,
B: static.B,
D: uint64(static.D),
Status: int32(static.Status),
Kind: int32(static.Kind),
},
})
r = append(r, traceQLStaticToProto(&kv))
}
}
return r
}

func traceQLStaticToProto(kv *traceqlmetrics.KeyValue) *tempopb.KeyValue {
val := tempopb.TraceQLStatic{Type: int32(kv.Value.Type)}

switch kv.Value.Type {
case traceql.TypeInt:
n, _ := kv.Value.Int()
val.N = int64(n)
case traceql.TypeFloat:
val.F = kv.Value.Float()
case traceql.TypeString:
val.S = kv.Value.EncodeToString(false)
case traceql.TypeBoolean:
b, _ := kv.Value.Bool()
val.B = b
case traceql.TypeDuration:
d, _ := kv.Value.Duration()
val.D = uint64(d)
case traceql.TypeStatus:
st, _ := kv.Value.Status()
val.Status = int32(st)
case traceql.TypeKind:
k, _ := kv.Value.Kind()
val.Kind = int32(k)
default:
val = tempopb.TraceQLStatic{Type: int32(traceql.TypeNil)}
}

return &tempopb.KeyValue{Key: kv.Key, Value: &val}
}

// filterBatches to only root spans or kind==server. Does not modify the input
// but returns a new struct referencing the same input pointers. Returns nil
// if there were no matching spans.
Expand Down
55 changes: 34 additions & 21 deletions modules/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,35 +710,36 @@ func (q *Querier) SpanMetricsSummary(
}

// Combine the results
yyy := make(map[traceqlmetrics.MetricSeries]*traceqlmetrics.LatencyHistogram)
xxx := make(map[traceqlmetrics.MetricSeries]*tempopb.SpanMetricsSummary)
yyy := make(map[traceqlmetrics.MetricKeys]*traceqlmetrics.LatencyHistogram)
xxx := make(map[traceqlmetrics.MetricKeys]*tempopb.SpanMetricsSummary)

var h *traceqlmetrics.LatencyHistogram
var s traceqlmetrics.MetricSeries
for _, r := range results {
for _, m := range r.Metrics {
s = protoToMetricSeries(m.Series)
k := s.MetricKeys()

if _, ok := xxx[s]; !ok {
xxx[s] = &tempopb.SpanMetricsSummary{Series: m.Series}
if _, ok := xxx[k]; !ok {
xxx[k] = &tempopb.SpanMetricsSummary{Series: m.Series}
}

xxx[s].ErrorSpanCount += m.Errors
xxx[k].ErrorSpanCount += m.Errors

var b [64]int
for _, l := range m.GetLatencyHistogram() {
// Reconstitude the bucket
b[l.Bucket] += int(l.Count)
// Add to the total
xxx[s].SpanCount += l.Count
xxx[k].SpanCount += l.Count
}

// Combine the histogram
h = traceqlmetrics.New(b)
if _, ok := yyy[s]; !ok {
yyy[s] = h
if _, ok := yyy[k]; !ok {
yyy[k] = h
} else {
yyy[s].Combine(*h)
yyy[k].Combine(*h)
}
}
}
Expand Down Expand Up @@ -1095,18 +1096,30 @@ func protoToMetricSeries(proto []*tempopb.KeyValue) traceqlmetrics.MetricSeries
return r
}

func protoToTraceQLStatic(proto *tempopb.KeyValue) traceqlmetrics.KeyValue {
func protoToTraceQLStatic(kv *tempopb.KeyValue) traceqlmetrics.KeyValue {
var val traceql.Static

switch traceql.StaticType(kv.Value.Type) {
case traceql.TypeInt:
val = traceql.NewStaticInt(int(kv.Value.N))
case traceql.TypeFloat:
val = traceql.NewStaticFloat(kv.Value.F)
case traceql.TypeString:
val = traceql.NewStaticString(kv.Value.S)
case traceql.TypeBoolean:
val = traceql.NewStaticBool(kv.Value.B)
case traceql.TypeDuration:
val = traceql.NewStaticDuration(time.Duration(kv.Value.D))
case traceql.TypeStatus:
val = traceql.NewStaticStatus(traceql.Status(kv.Value.Status))
case traceql.TypeKind:
val = traceql.NewStaticKind(traceql.Kind(kv.Value.Kind))
default:
val = traceql.NewStaticNil()
}

return traceqlmetrics.KeyValue{
Key: proto.Key,
Value: traceql.Static{
Type: traceql.StaticType(proto.Value.Type),
N: int(proto.Value.N),
F: proto.Value.F,
S: proto.Value.S,
B: proto.Value.B,
D: time.Duration(proto.Value.D),
Status: traceql.Status(proto.Value.Status),
Kind: traceql.Kind(proto.Value.Kind),
},
Key: kv.Key,
Value: val,
}
}
Loading

0 comments on commit 6a6a7d1

Please sign in to comment.