Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace CombineTraceProtos with new Combiner #1291

Merged
merged 7 commits into from
Feb 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* [ENHANCEMENT] Enterprise jsonnet: add config to create tokengen job explicitly [#1256](https://github.com/grafana/tempo/pull/1256) (@kvrhdn)
* [ENHANCEMENT] Add new scaling alerts to the tempo-mixin [#1292](https://github.com/grafana/tempo/pull/1292) (@mapno)
* [ENHANCEMENT] Improve serverless handler error messages [#1305](https://github.com/grafana/tempo/pull/1305) (@joe-elliott)
* [ENHANCEMENT] Make trace combination/compaction more efficient [#1291](https://github.com/grafana/tempo/pull/1291) (@mdisibio)
* [BUGFIX]: Remove unnecessary PersistentVolumeClaim [#1245](https://github.com/grafana/tempo/issues/1245)
* [BUGFIX] Fixed issue when query-frontend doesn't log request details when request is cancelled [#1136](https://github.com/grafana/tempo/issues/1136) (@adityapwr)
* [BUGFIX] Update OTLP port in examples (docker-compose & kubernetes) from legacy ports (55680/55681) to new ports (4317/4318) [#1294](https://github.com/grafana/tempo/pull/1294) (@mapno)
Expand Down
11 changes: 6 additions & 5 deletions cmd/tempo-cli/cmd-query-blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ func (cmd *queryBlocksCmd) Run(ctx *globalOptions) error {
}

var (
combinedTrace *tempopb.Trace
marshaller = new(jsonpb.Marshaler)
jsonBytes = bytes.Buffer{}
combiner = trace.NewCombiner()
marshaller = new(jsonpb.Marshaler)
jsonBytes = bytes.Buffer{}
)

fmt.Println()
for _, result := range results {
for i, result := range results {
fmt.Println(result.blockID, ":")

err := marshaller.Marshal(&jsonBytes, result.trace)
Expand All @@ -64,9 +64,10 @@ func (cmd *queryBlocksCmd) Run(ctx *globalOptions) error {

fmt.Println(jsonBytes.String())
jsonBytes.Reset()
combinedTrace, _ = trace.CombineTraceProtos(result.trace, combinedTrace)
combiner.ConsumeWithFinal(result.trace, i == len(results)-1)
}

combinedTrace, _ := combiner.Result()
fmt.Println("combined:")
err = marshaller.Marshal(&jsonBytes, combinedTrace)
if err != nil {
Expand Down
6 changes: 4 additions & 2 deletions modules/frontend/tracebyidsharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ func (s shardQuery) RoundTrip(r *http.Request) (*http.Response, error) {

var overallError error
var totalFailedBlocks uint32
overallTrace := &tempopb.Trace{}
combiner := trace.NewCombiner()
combiner.Consume(&tempopb.Trace{}) // The query path returns a non-nil result even if no inputs (which is different than other paths which return nil for no inputs)
statusCode := http.StatusNotFound
statusMsg := "trace not found"

Expand Down Expand Up @@ -139,7 +140,7 @@ func (s shardQuery) RoundTrip(r *http.Request) (*http.Response, error) {

// happy path
statusCode = http.StatusOK
overallTrace, _ = trace.CombineTraceProtos(overallTrace, traceResp.Trace)
combiner.Consume(traceResp.Trace)
}(req)
}
wg.Wait()
Expand All @@ -148,6 +149,7 @@ func (s shardQuery) RoundTrip(r *http.Request) (*http.Response, error) {
return nil, overallError
}

overallTrace, _ := combiner.Result()
if overallTrace == nil || statusCode != http.StatusOK {
// translate non-404s into 500s. if, for instance, we get a 400 back from an internal component
// it means that we created a bad request. 400 should not be propagated back to the user b/c
Expand Down
14 changes: 9 additions & 5 deletions modules/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque
span, ctx := opentracing.StartSpanFromContext(ctx, "Querier.FindTraceByID")
defer span.Finish()

var completeTrace *tempopb.Trace
combiner := trace.NewCombiner()
var spanCount, spanCountTotal, traceCountTotal int
if req.QueryMode == QueryModeIngesters || req.QueryMode == QueryModeAll {
replicationSet, err := q.ring.GetReplicationSetForOperation(ring.Read)
Expand All @@ -184,16 +184,18 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque
return nil, errors.Wrap(err, "error querying ingesters in Querier.FindTraceByID")
}

found := false
for _, r := range responses {
t := r.response.(*tempopb.TraceByIDResponse).Trace
if t != nil {
completeTrace, spanCount = trace.CombineTraceProtos(completeTrace, t)
spanCount = combiner.Consume(t)
spanCountTotal += spanCount
traceCountTotal++
found = true
}
}
span.LogFields(ot_log.String("msg", "done searching ingesters"),
ot_log.Bool("found", completeTrace != nil),
ot_log.Bool("found", found),
ot_log.Int("combinedSpans", spanCountTotal),
ot_log.Int("combinedTraces", traceCountTotal))
}
Expand Down Expand Up @@ -225,17 +227,19 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque
}
}

completeTrace, spanCount = trace.CombineTraceProtos(completeTrace, storeTrace)
spanCount = combiner.Consume(storeTrace)
spanCountTotal += spanCount
traceCountTotal++

span.LogFields(ot_log.String("msg", "combined trace protos from store"),
ot_log.Bool("found", completeTrace != nil),
ot_log.Bool("found", len(partialTraces) > 0),
ot_log.Int("combinedSpans", spanCountTotal),
ot_log.Int("combinedTraces", traceCountTotal))
}
}

completeTrace, _ := combiner.Result()

return &tempopb.TraceByIDResponse{
Trace: completeTrace,
Metrics: &tempopb.TraceByIDMetrics{
Expand Down
5 changes: 4 additions & 1 deletion pkg/model/combine.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,10 @@ func CombineForRead(obj []byte, dataEncoding string, t *tempopb.Trace) (*tempopb
return nil, fmt.Errorf("error unmarshalling obj (%s): %w", dataEncoding, err)
}

combined, _ := trace.CombineTraceProtos(objTrace, t)
c := trace.NewCombiner()
c.Consume(objTrace)
c.ConsumeWithFinal(t, true)
combined, _ := c.Result()

return combined, nil
}
17 changes: 0 additions & 17 deletions pkg/model/combine_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package model

import (
"fmt"
"math/rand"
"testing"

Expand Down Expand Up @@ -102,22 +101,6 @@ func TestCombine(t *testing.T) {
}
}

func BenchmarkCombineTraceProtos(b *testing.B) {
sizes := []int{1, 10, 1000, 10000, 100000}

for _, size := range sizes {
b.Run(fmt.Sprint(size), func(b *testing.B) {
t1 := test.MakeTraceWithSpanCount(1, size, []byte{0x01, 0x02})
t2 := test.MakeTraceWithSpanCount(1, size, []byte{0x01, 0x03})

b.ResetTimer()
for i := 0; i < b.N; i++ {
trace.CombineTraceProtos(t1, t2)
}
})
}
}

func mustMarshalToObject(trace *tempopb.Trace, encoding string) []byte {
return mustMarshalToObjectWithRange(trace, encoding, 0, 0)
}
Expand Down
149 changes: 102 additions & 47 deletions pkg/model/trace/combine.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,77 +8,132 @@ import (
"github.com/grafana/tempo/pkg/tempopb"
)

// CombineTraceProtos combines two trace protos into one. Note that it is destructive.
// All spans are combined into traceA. spanCountA, B, and Total are returned for
// logging purposes.
func CombineTraceProtos(traceA, traceB *tempopb.Trace) (*tempopb.Trace, int) {
// if one or the other is nil just return 0 for the one that's nil and -1 for the other. this will be a clear indication this
// code path was taken without unnecessarily counting spans
if traceA == nil {
return traceB, -1
}
// token is uint64 to reduce hash collision rates. Experimentally, it was observed
// that fnv32 could approach a collision rate of 1 in 10,000. fnv64 avoids collisions
// when tested against traces with up to 1M spans (see matching test). A collision
// results in a dropped span during combine.
type token uint64

if traceB == nil {
return traceA, -1
}
func newHash() hash.Hash64 {
return fnv.New64()
}

spanCountTotal := 0
// tokenForID returns a token for use in a hash map given a span id and span kind
// buffer must be a 4 byte slice and is reused for writing the span kind to the hashing function
// kind is used along with the actual id b/c in zipkin traces span id is not guaranteed to be unique
// as it is shared between client and server spans.
func tokenForID(h hash.Hash64, buffer []byte, kind int32, b []byte) token {
binary.LittleEndian.PutUint32(buffer, uint32(kind))

h := fnv.New32()
h.Reset()
_, _ = h.Write(b)
_, _ = h.Write(buffer)
return token(h.Sum64())
}

// Combiner combines multiple partial traces into one, deduping spans based on
// ID and kind. Note that it is destructive. There are design decisions for
// efficiency:
// * Only scan/hash the spans for each input once, which is reused across calls.
// * Only sort the final result once and if needed.
// * Don't scan/hash the spans for the last input (final=true).
type Combiner struct {
result *tempopb.Trace
spans map[token]struct{}
combined bool
}

func NewCombiner() *Combiner {
return &Combiner{}
}

// Consume the given trace and destructively combines its contents.
func (c *Combiner) Consume(tr *tempopb.Trace) (spanCount int) {
return c.ConsumeWithFinal(tr, false)
}

// ConsumeWithFinal consumes the trace, but allows for performance savings when
// it is known that this is the last expected input trace.
func (c *Combiner) ConsumeWithFinal(tr *tempopb.Trace, final bool) (spanCount int) {
if tr == nil {
return
}

h := newHash()
buffer := make([]byte, 4)

spansInA := make(map[uint32]struct{})
for _, batchA := range traceA.Batches {
for _, ilsA := range batchA.InstrumentationLibrarySpans {
for _, spanA := range ilsA.Spans {
spansInA[tokenForID(h, buffer, int32(spanA.Kind), spanA.SpanId)] = struct{}{}
// First call?
if c.result == nil {
c.result = tr

// Pre-alloc map with input size. This saves having to grow the
// map from the small starting size.
n := 0
for _, b := range c.result.Batches {
for _, ils := range b.InstrumentationLibrarySpans {
n += len(ils.Spans)
}
spanCountTotal += len(ilsA.Spans)
}
c.spans = make(map[token]struct{}, n)

for _, b := range c.result.Batches {
for _, ils := range b.InstrumentationLibrarySpans {
for _, s := range ils.Spans {
c.spans[tokenForID(h, buffer, int32(s.Kind), s.SpanId)] = struct{}{}
}
}
}
return
}

// loop through every span and copy spans in B that don't exist to A
for _, batchB := range traceB.Batches {
notFoundILS := batchB.InstrumentationLibrarySpans[:0]

for _, ilsB := range batchB.InstrumentationLibrarySpans {
notFoundSpans := ilsB.Spans[:0]
for _, spanB := range ilsB.Spans {
// if found in A, remove from the batch
_, ok := spansInA[tokenForID(h, buffer, int32(spanB.Kind), spanB.SpanId)]
for _, b := range tr.Batches {
notFoundILS := b.InstrumentationLibrarySpans[:0]

for _, ils := range b.InstrumentationLibrarySpans {
notFoundSpans := ils.Spans[:0]
for _, s := range ils.Spans {
// if not already encountered, then keep
token := tokenForID(h, buffer, int32(s.Kind), s.SpanId)
_, ok := c.spans[token]
if !ok {
notFoundSpans = append(notFoundSpans, spanB)
notFoundSpans = append(notFoundSpans, s)

// If last expected input, then we don't need to record
// the visited spans. Optimization has significant savings.
if !final {
c.spans[token] = struct{}{}
}
}
}

if len(notFoundSpans) > 0 {
spanCountTotal += len(notFoundSpans)
ilsB.Spans = notFoundSpans
notFoundILS = append(notFoundILS, ilsB)
ils.Spans = notFoundSpans
spanCount += len(notFoundSpans)
notFoundILS = append(notFoundILS, ils)
}
}

// if there were some spans not found in A, add everything left in the batch
if len(notFoundILS) > 0 {
batchB.InstrumentationLibrarySpans = notFoundILS
traceA.Batches = append(traceA.Batches, batchB)
b.InstrumentationLibrarySpans = notFoundILS
c.result.Batches = append(c.result.Batches, b)
}
}

SortTrace(traceA)

return traceA, spanCountTotal
c.combined = true
return
}

// tokenForID returns a uint32 token for use in a hash map given a span id and span kind
// buffer must be a 4 byte slice and is reused for writing the span kind to the hashing function
// kind is used along with the actual id b/c in zipkin traces span id is not guaranteed to be unique
// as it is shared between client and server spans.
func tokenForID(h hash.Hash32, buffer []byte, kind int32, b []byte) uint32 {
binary.LittleEndian.PutUint32(buffer, uint32(kind))
// Result returns the final trace and span count.
func (c *Combiner) Result() (*tempopb.Trace, int) {
spanCount := -1

h.Reset()
_, _ = h.Write(b)
_, _ = h.Write(buffer)
return h.Sum32()
if c.result != nil && c.combined {
// Only if anything combined
SortTrace(c.result)
spanCount = len(c.spans)
}

return c.result, spanCount
}
Loading