Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix an inconsistency in wal search #1548

Merged
merged 6 commits into from
Jul 6, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ Additionally, default label `span_status` is renamed to `status_code`.
* [BUGFIX] Update tempo microservices Helm values example which missed the 'enabled' key for thriftHttp. [#1472](https://github.com/grafana/tempo/pull/1472) (@hajowieland)
* [BUGFIX] Fix race condition in forwarder overrides loop. [1468](https://github.com/grafana/tempo/pull/1468) (@mapno)
* [BUGFIX] Fix v2 backend check on span name to be substring [#1538](https://github.com/grafana/tempo/pull/1538) (@mdisibio)
* [BUGFIX] Fix wal check on span name to be substring [#1548](https://github.com/grafana/tempo/pull/1548) (@mdisibio)
* [ENHANCEMENT] Add a config to query single ingester instance based on trace id hash for Trace By ID API. (1484)[https://github.com/grafana/tempo/pull/1484] (@sagarwala, @bikashmishra100, @ashwinidulams)
* [ENHANCEMENT] Add blocklist metrics for total backend objects and total backend bytes [#1519](https://github.com/grafana/tempo/pull/1519) (@ie-pham)

Expand Down
105 changes: 3 additions & 102 deletions modules/distributor/search_data.go
Original file line number Diff line number Diff line change
@@ -1,113 +1,14 @@
package distributor

import (
"strconv"

"github.com/grafana/tempo/pkg/model/trace"
"github.com/grafana/tempo/pkg/tempofb"
"github.com/grafana/tempo/pkg/tempopb"
common_v1 "github.com/grafana/tempo/pkg/tempopb/common/v1"
)

type extractTagFunc func(tag string) bool
import "github.com/grafana/tempo/pkg/model/trace"

// extractSearchDataAll returns flatbuffer search data for every trace.
func extractSearchDataAll(traces []*rebatchedTrace, extractTag extractTagFunc) [][]byte {
func extractSearchDataAll(traces []*rebatchedTrace, extractTag trace.ExtractTagFunc) [][]byte {
headers := make([][]byte, len(traces))

for i, t := range traces {
headers[i] = extractSearchData(t.trace, t.id, extractTag)
headers[i] = trace.ExtractSearchData(t.trace, t.id, extractTag)
}

return headers
}

// extractSearchData returns the flatbuffer search data for the given trace. It is extracted here
// in the distributor because this is the only place on the ingest path where the trace is available
// in object form.
func extractSearchData(tr *tempopb.Trace, id []byte, extractTag extractTagFunc) []byte {
data := &tempofb.SearchEntryMutable{}

data.TraceID = id

for _, b := range tr.Batches {
// Batch attrs
if b.Resource != nil {
for _, a := range b.Resource.Attributes {
if !extractTag(a.Key) {
continue
}
if s, ok := extractValueAsString(a.Value); ok {
data.AddTag(a.Key, s)
}
}
}

for _, ils := range b.InstrumentationLibrarySpans {
for _, s := range ils.Spans {

// Root span
if len(s.ParentSpanId) == 0 {

// Collect root.name
data.AddTag(trace.RootSpanNameTag, s.Name)

// Collect root.service.name
if b.Resource != nil {
for _, a := range b.Resource.Attributes {
if a.Key == trace.ServiceNameTag {
if s, ok := extractValueAsString(a.Value); ok {
data.AddTag(trace.RootServiceNameTag, s)
}
}
}
}
}

// Collect for any spans
data.AddTag(trace.SpanNameTag, s.Name)
if s.Status != nil {
data.AddTag(trace.StatusCodeTag, strconv.Itoa(int(s.Status.Code)))
}
data.SetStartTimeUnixNano(s.StartTimeUnixNano)
data.SetEndTimeUnixNano(s.EndTimeUnixNano)

for _, a := range s.Attributes {
if !extractTag(a.Key) {
continue
}
if s, ok := extractValueAsString(a.Value); ok {
data.AddTag(a.Key, s)
}
}
}
}
}

return data.ToBytes()
}

func extractValueAsString(v *common_v1.AnyValue) (s string, ok bool) {
vv := v.GetValue()
if vv == nil {
return "", false
}

if s, ok := vv.(*common_v1.AnyValue_StringValue); ok {
return s.StringValue, true
}

if b, ok := vv.(*common_v1.AnyValue_BoolValue); ok {
return strconv.FormatBool(b.BoolValue), true
}

if i, ok := vv.(*common_v1.AnyValue_IntValue); ok {
return strconv.FormatInt(i.IntValue, 10), true
}

if d, ok := vv.(*common_v1.AnyValue_DoubleValue); ok {
return strconv.FormatFloat(d.DoubleValue, 'g', -1, 64), true
}

return "", false
}
101 changes: 101 additions & 0 deletions pkg/model/trace/search_data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package trace

import (
"strconv"

"github.com/grafana/tempo/pkg/tempofb"
"github.com/grafana/tempo/pkg/tempopb"
common_v1 "github.com/grafana/tempo/pkg/tempopb/common/v1"
)

type ExtractTagFunc func(tag string) bool

// ExtractSearchData returns the flatbuffer search data for the given trace. It is extracted here
// in the distributor because this is the only place on the ingest path where the trace is available
// in object form.
func ExtractSearchData(tr *tempopb.Trace, id []byte, extractTag ExtractTagFunc) []byte {
data := &tempofb.SearchEntryMutable{}

data.TraceID = id

for _, b := range tr.Batches {
// Batch attrs
if b.Resource != nil {
for _, a := range b.Resource.Attributes {
if !extractTag(a.Key) {
continue
}
if s, ok := extractValueAsString(a.Value); ok {
data.AddTag(a.Key, s)
}
}
}

for _, ils := range b.InstrumentationLibrarySpans {
for _, s := range ils.Spans {

// Root span
if len(s.ParentSpanId) == 0 {

// Collect root.name
data.AddTag(RootSpanNameTag, s.Name)

// Collect root.service.name
if b.Resource != nil {
for _, a := range b.Resource.Attributes {
if a.Key == ServiceNameTag {
if s, ok := extractValueAsString(a.Value); ok {
data.AddTag(RootServiceNameTag, s)
}
}
}
}
}

// Collect for any spans
data.AddTag(SpanNameTag, s.Name)
if s.Status != nil {
data.AddTag(StatusCodeTag, strconv.Itoa(int(s.Status.Code)))
}
data.SetStartTimeUnixNano(s.StartTimeUnixNano)
data.SetEndTimeUnixNano(s.EndTimeUnixNano)

for _, a := range s.Attributes {
if !extractTag(a.Key) {
continue
}
if s, ok := extractValueAsString(a.Value); ok {
data.AddTag(a.Key, s)
}
}
}
}
}

return data.ToBytes()
}

func extractValueAsString(v *common_v1.AnyValue) (s string, ok bool) {
vv := v.GetValue()
if vv == nil {
return "", false
}

if s, ok := vv.(*common_v1.AnyValue_StringValue); ok {
return s.StringValue, true
}

if b, ok := vv.(*common_v1.AnyValue_BoolValue); ok {
return strconv.FormatBool(b.BoolValue), true
}

if i, ok := vv.(*common_v1.AnyValue_IntValue); ok {
return strconv.FormatInt(i.IntValue, 10), true
}

if d, ok := vv.(*common_v1.AnyValue_DoubleValue); ok {
return strconv.FormatFloat(d.DoubleValue, 'g', -1, 64), true
}

return "", false
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package distributor
package trace

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/grafana/tempo/pkg/model/trace"
"github.com/grafana/tempo/pkg/tempofb"
"github.com/grafana/tempo/pkg/tempopb"
v1_common "github.com/grafana/tempo/pkg/tempopb/common/v1"
Expand All @@ -20,7 +19,7 @@ func TestExtractSearchData(t *testing.T) {
name string
trace *tempopb.Trace
id []byte
extractTag extractTagFunc
extractTag ExtractTagFunc
searchData *tempofb.SearchEntryMutable
}{
{
Expand Down Expand Up @@ -64,11 +63,11 @@ func TestExtractSearchData(t *testing.T) {
searchData: &tempofb.SearchEntryMutable{
TraceID: traceIDA,
Tags: tempofb.NewSearchDataMapWithData(map[string][]string{
"foo": {"bar"},
trace.RootSpanNameTag: {"firstSpan"},
trace.SpanNameTag: {"firstSpan"},
trace.RootServiceNameTag: {"baz"},
trace.ServiceNameTag: {"baz"},
"foo": {"bar"},
RootSpanNameTag: {"firstSpan"},
SpanNameTag: {"firstSpan"},
RootServiceNameTag: {"baz"},
ServiceNameTag: {"baz"},
}),
StartTimeUnixNano: 0,
EndTimeUnixNano: 0,
Expand Down Expand Up @@ -118,7 +117,7 @@ func TestExtractSearchData(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
assert.Equal(t, tc.searchData.ToBytes(), extractSearchData(tc.trace, tc.id, tc.extractTag))
assert.Equal(t, tc.searchData.ToBytes(), ExtractSearchData(tc.trace, tc.id, tc.extractTag))
})
}
}
Loading