Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace CombineTraceProtos with new Combiner #1291

Merged
merged 7 commits into from
Feb 24, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions cmd/tempo-cli/cmd-query-blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ func (cmd *queryBlocksCmd) Run(ctx *globalOptions) error {
}

var (
combinedTrace *tempopb.Trace
marshaller = new(jsonpb.Marshaler)
jsonBytes = bytes.Buffer{}
combiner = trace.NewCombiner()
marshaller = new(jsonpb.Marshaler)
jsonBytes = bytes.Buffer{}
)

fmt.Println()
for _, result := range results {
for i, result := range results {
fmt.Println(result.blockID, ":")

err := marshaller.Marshal(&jsonBytes, result.trace)
Expand All @@ -64,9 +64,10 @@ func (cmd *queryBlocksCmd) Run(ctx *globalOptions) error {

fmt.Println(jsonBytes.String())
jsonBytes.Reset()
combinedTrace, _ = trace.CombineTraceProtos(result.trace, combinedTrace)
combiner.ConsumeWithFinal(result.trace, i == len(results)-1)
}

combinedTrace, _ := combiner.Result()
fmt.Println("combined:")
err = marshaller.Marshal(&jsonBytes, combinedTrace)
if err != nil {
Expand Down
6 changes: 4 additions & 2 deletions modules/frontend/tracebyidsharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ func (s shardQuery) RoundTrip(r *http.Request) (*http.Response, error) {

var overallError error
var totalFailedBlocks uint32
overallTrace := &tempopb.Trace{}
combiner := trace.NewCombiner()
combiner.Consume(&tempopb.Trace{}) // The query path returns a non-nil result even if no inputs (which is different than other paths which return nil for no inputs)
statusCode := http.StatusNotFound
statusMsg := "trace not found"

Expand Down Expand Up @@ -139,7 +140,7 @@ func (s shardQuery) RoundTrip(r *http.Request) (*http.Response, error) {

// happy path
statusCode = http.StatusOK
overallTrace, _ = trace.CombineTraceProtos(overallTrace, traceResp.Trace)
combiner.Consume(traceResp.Trace)
}(req)
}
wg.Wait()
Expand All @@ -148,6 +149,7 @@ func (s shardQuery) RoundTrip(r *http.Request) (*http.Response, error) {
return nil, overallError
}

overallTrace, _ := combiner.Result()
if overallTrace == nil || statusCode != http.StatusOK {
// translate non-404s into 500s. if, for instance, we get a 400 back from an internal component
// it means that we created a bad request. 400 should not be propagated back to the user b/c
Expand Down
14 changes: 9 additions & 5 deletions modules/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque
span, ctx := opentracing.StartSpanFromContext(ctx, "Querier.FindTraceByID")
defer span.Finish()

var completeTrace *tempopb.Trace
combiner := trace.NewCombiner()
var spanCount, spanCountTotal, traceCountTotal int
if req.QueryMode == QueryModeIngesters || req.QueryMode == QueryModeAll {
replicationSet, err := q.ring.GetReplicationSetForOperation(ring.Read)
Expand All @@ -184,16 +184,18 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque
return nil, errors.Wrap(err, "error querying ingesters in Querier.FindTraceByID")
}

found := false
for _, r := range responses {
t := r.response.(*tempopb.TraceByIDResponse).Trace
if t != nil {
completeTrace, spanCount = trace.CombineTraceProtos(completeTrace, t)
spanCount = combiner.Consume(t)
spanCountTotal += spanCount
traceCountTotal++
found = true
}
}
span.LogFields(ot_log.String("msg", "done searching ingesters"),
ot_log.Bool("found", completeTrace != nil),
ot_log.Bool("found", found),
ot_log.Int("combinedSpans", spanCountTotal),
ot_log.Int("combinedTraces", traceCountTotal))
}
Expand Down Expand Up @@ -225,17 +227,19 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque
}
}

completeTrace, spanCount = trace.CombineTraceProtos(completeTrace, storeTrace)
spanCount = combiner.Consume(storeTrace)
spanCountTotal += spanCount
traceCountTotal++

span.LogFields(ot_log.String("msg", "combined trace protos from store"),
ot_log.Bool("found", completeTrace != nil),
ot_log.Bool("found", len(partialTraces) > 0),
ot_log.Int("combinedSpans", spanCountTotal),
ot_log.Int("combinedTraces", traceCountTotal))
}
}

completeTrace, _ := combiner.Result()

return &tempopb.TraceByIDResponse{
Trace: completeTrace,
Metrics: &tempopb.TraceByIDMetrics{
Expand Down
17 changes: 0 additions & 17 deletions pkg/model/combine_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package model

import (
"fmt"
"math/rand"
"testing"

Expand Down Expand Up @@ -102,22 +101,6 @@ func TestCombine(t *testing.T) {
}
}

func BenchmarkCombineTraceProtos(b *testing.B) {
sizes := []int{1, 10, 1000, 10000, 100000}

for _, size := range sizes {
b.Run(fmt.Sprint(size), func(b *testing.B) {
t1 := test.MakeTraceWithSpanCount(1, size, []byte{0x01, 0x02})
t2 := test.MakeTraceWithSpanCount(1, size, []byte{0x01, 0x03})

b.ResetTimer()
for i := 0; i < b.N; i++ {
trace.CombineTraceProtos(t1, t2)
}
})
}
}

func mustMarshalToObject(trace *tempopb.Trace, encoding string) []byte {
return mustMarshalToObjectWithRange(trace, encoding, 0, 0)
}
Expand Down
118 changes: 113 additions & 5 deletions pkg/model/trace/combine.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/grafana/tempo/pkg/tempopb"
)

// Deprecated: This is only capable of pairwise combination. It is replaced by Combiner.
// CombineTraceProtos combines two trace protos into one. Note that it is destructive.
// All spans are combined into traceA. spanCountA, B, and Total are returned for
// logging purposes.
Expand All @@ -24,10 +25,10 @@ func CombineTraceProtos(traceA, traceB *tempopb.Trace) (*tempopb.Trace, int) {

spanCountTotal := 0

h := fnv.New32()
h := newHash()
buffer := make([]byte, 4)

spansInA := make(map[uint32]struct{})
spansInA := make(map[token]struct{})
for _, batchA := range traceA.Batches {
for _, ilsA := range batchA.InstrumentationLibrarySpans {
for _, spanA := range ilsA.Spans {
Expand Down Expand Up @@ -70,15 +71,122 @@ func CombineTraceProtos(traceA, traceB *tempopb.Trace) (*tempopb.Trace, int) {
return traceA, spanCountTotal
}

// tokenForID returns a uint32 token for use in a hash map given a span id and span kind
// token is unint64 to reduce hash collision rates. Experimentally, it was observed
// that fnv32 could approach a collision rate of 1 in 10,000. fnv64 avoids collisions
// when tested against traces with up to 1M spans (see matching test). A collision
// results in a dropped span during combine.
type token uint64

func newHash() hash.Hash64 {
return fnv.New64()
}

// tokenForID returns a token for use in a hash map given a span id and span kind
// buffer must be a 4 byte slice and is reused for writing the span kind to the hashing function
// kind is used along with the actual id b/c in zipkin traces span id is not guaranteed to be unique
// as it is shared between client and server spans.
func tokenForID(h hash.Hash32, buffer []byte, kind int32, b []byte) uint32 {
func tokenForID(h hash.Hash64, buffer []byte, kind int32, b []byte) token {
binary.LittleEndian.PutUint32(buffer, uint32(kind))

h.Reset()
_, _ = h.Write(b)
_, _ = h.Write(buffer)
return h.Sum32()
return token(h.Sum64())
}

// Combiner combines multiple partial traces into one, deduping spans based on
// ID and kind. Note that it is destructive. There are several efficiency
// improvements over the previous pairwise CombineTraceProtos:
// * Only scan/hash the spans for each input once
// * Only sort the final result once.
type Combiner struct {
result *tempopb.Trace
spans map[token]struct{}
combined bool
}

func NewCombiner() *Combiner {
return &Combiner{
spans: map[token]struct{}{},
}
}

// Consume the given trace and destructively combines its contents.
func (c *Combiner) Consume(tr *tempopb.Trace) (spanCount int) {
return c.ConsumeWithFinal(tr, false)
}

// ConsumeWithFinal consumes the trace, but allows for performance savings when
// it is known that this is the last expected input trace.
func (c *Combiner) ConsumeWithFinal(tr *tempopb.Trace, final bool) (spanCount int) {
if tr == nil {
return
}

h := newHash()
buffer := make([]byte, 4)

// First call?
if c.result == nil {
c.result = tr
for _, b := range c.result.Batches {
for _, ils := range b.InstrumentationLibrarySpans {
for _, s := range ils.Spans {
c.spans[tokenForID(h, buffer, int32(s.Kind), s.SpanId)] = struct{}{}
}
}
}
return
}

// loop through every span and copy spans in B that don't exist to A
for _, b := range tr.Batches {
notFoundILS := b.InstrumentationLibrarySpans[:0]

for _, ils := range b.InstrumentationLibrarySpans {
notFoundSpans := ils.Spans[:0]
for _, s := range ils.Spans {
// if not already encountered, then keep
token := tokenForID(h, buffer, int32(s.Kind), s.SpanId)
_, ok := c.spans[token]
if !ok {
notFoundSpans = append(notFoundSpans, s)

// If last expected input, then we don't need to record
// the visited spans. Optimization has significant savings.
if !final {
c.spans[token] = struct{}{}
}
}
}

if len(notFoundSpans) > 0 {
ils.Spans = notFoundSpans
spanCount += len(notFoundSpans)
notFoundILS = append(notFoundILS, ils)
}
}

// if there were some spans not found in A, add everything left in the batch
if len(notFoundILS) > 0 {
b.InstrumentationLibrarySpans = notFoundILS
c.result.Batches = append(c.result.Batches, b)
}
}

c.combined = true
return
}

// Result returns the final trace and span count.
func (c *Combiner) Result() (*tempopb.Trace, int) {
spanCount := -1

if c.result != nil && c.combined {
// Only if anything combined
SortTrace(c.result)
spanCount = len(c.spans)
}

return c.result, spanCount
}
Loading