From a4659c87e81141ebbb675cab218d88f9a6701b72 Mon Sep 17 00:00:00 2001 From: Martin Disibio Date: Tue, 8 Feb 2022 16:27:26 -0500 Subject: [PATCH 1/6] Create trace.Combiner which has better performance on multiple/larger trace deduping --- pkg/model/trace/combine.go | 97 ++++++++++++++++++++++++-- pkg/model/trace/combine_test.go | 116 ++++++++++++++++++++++++++++++-- pkg/model/v1/object_decoder.go | 5 +- pkg/model/v2/object_decoder.go | 6 +- 4 files changed, 211 insertions(+), 13 deletions(-) diff --git a/pkg/model/trace/combine.go b/pkg/model/trace/combine.go index 2d4dd1a7d4c..e6da1f33163 100644 --- a/pkg/model/trace/combine.go +++ b/pkg/model/trace/combine.go @@ -24,10 +24,10 @@ func CombineTraceProtos(traceA, traceB *tempopb.Trace) (*tempopb.Trace, int) { spanCountTotal := 0 - h := fnv.New32() + h := newHash() buffer := make([]byte, 4) - spansInA := make(map[uint32]struct{}) + spansInA := make(map[token]struct{}) for _, batchA := range traceA.Batches { for _, ilsA := range batchA.InstrumentationLibrarySpans { for _, spanA := range ilsA.Spans { @@ -70,15 +70,104 @@ func CombineTraceProtos(traceA, traceB *tempopb.Trace) (*tempopb.Trace, int) { return traceA, spanCountTotal } +type token uint64 + +func newHash() hash.Hash64 { + return fnv.New64() +} + // tokenForID returns a uint32 token for use in a hash map given a span id and span kind // buffer must be a 4 byte slice and is reused for writing the span kind to the hashing function // kind is used along with the actual id b/c in zipkin traces span id is not guaranteed to be unique // as it is shared between client and server spans. -func tokenForID(h hash.Hash32, buffer []byte, kind int32, b []byte) uint32 { +func tokenForID(h hash.Hash64, buffer []byte, kind int32, b []byte) token { binary.LittleEndian.PutUint32(buffer, uint32(kind)) h.Reset() _, _ = h.Write(b) _, _ = h.Write(buffer) - return h.Sum32() + return token(h.Sum64()) +} + +type Combiner struct { + result *tempopb.Trace + spans map[token]struct{} + combined bool +} + +func NewCombiner() *Combiner { + return &Combiner{ + spans: map[token]struct{}{}, + } +} + +func (c *Combiner) ConsumeAll(traces ...*tempopb.Trace) { + for _, t := range traces { + c.Consume(t) + } +} + +func (c *Combiner) Consume(tr *tempopb.Trace) { + if tr == nil { + return + } + + h := newHash() + buffer := make([]byte, 4) + + // First call? + if c.result == nil { + c.result = tr + for _, b := range c.result.Batches { + for _, ils := range b.InstrumentationLibrarySpans { + for _, s := range ils.Spans { + c.spans[tokenForID(h, buffer, int32(s.Kind), s.SpanId)] = struct{}{} + } + } + } + return + } + + // loop through every span and copy spans in B that don't exist to A + for _, b := range tr.Batches { + notFoundILS := b.InstrumentationLibrarySpans[:0] + + for _, ils := range b.InstrumentationLibrarySpans { + notFoundSpans := ils.Spans[:0] + for _, s := range ils.Spans { + // if not already encountered, then keep + token := tokenForID(h, buffer, int32(s.Kind), s.SpanId) + _, ok := c.spans[token] + if !ok { + notFoundSpans = append(notFoundSpans, s) + c.spans[token] = struct{}{} + } + } + + if len(notFoundSpans) > 0 { + ils.Spans = notFoundSpans + notFoundILS = append(notFoundILS, ils) + } + } + + // if there were some spans not found in A, add everything left in the batch + if len(notFoundILS) > 0 { + b.InstrumentationLibrarySpans = notFoundILS + c.result.Batches = append(c.result.Batches, b) + } + } + + c.combined = true +} + +func (c *Combiner) Result() (*tempopb.Trace, int) { + spanCount := -1 + + if c.result != nil && c.combined { + // Only if anything combined + SortTrace(c.result) + spanCount = len(c.spans) + } + + return c.result, spanCount } diff --git a/pkg/model/trace/combine_test.go b/pkg/model/trace/combine_test.go index 6164fe9e1e0..5f79e2ca7fa 100644 --- a/pkg/model/trace/combine_test.go +++ b/pkg/model/trace/combine_test.go @@ -1,17 +1,32 @@ package trace import ( - "hash/fnv" + "bytes" + "crypto/rand" + "fmt" + "sort" + "strconv" "testing" "github.com/grafana/tempo/pkg/tempopb" "github.com/grafana/tempo/pkg/util/test" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestCombineProtoTotals(t *testing.T) { - sameTrace := test.MakeTraceWithSpanCount(10, 10, []byte{0x01, 0x03}) + methods := []func(a, b *tempopb.Trace) (*tempopb.Trace, int){ + CombineTraceProtos, + func(a, b *tempopb.Trace) (*tempopb.Trace, int) { + c := NewCombiner() + c.Consume(a) + c.Consume(b) + return c.Result() + }, + } + + sameTrace := test.MakeTraceWithSpanCount(10, 10, []byte{0x01, 0x03}) tests := []struct { traceA *tempopb.Trace traceB *tempopb.Trace @@ -40,13 +55,52 @@ func TestCombineProtoTotals(t *testing.T) { } for _, tt := range tests { - _, actualTotal := CombineTraceProtos(tt.traceA, tt.traceB) - assert.Equal(t, tt.expectedTotal, actualTotal) + for _, m := range methods { + _, actualTotal := m(tt.traceA, tt.traceB) + assert.Equal(t, tt.expectedTotal, actualTotal) + } + } +} + +func TestTokenForIDCollision(t *testing.T) { + + n := 100_000_0 + h := newHash() + buf := make([]byte, 4) + + tokens := map[token]struct{}{} + IDs := [][]byte{} + + spanID := make([]byte, 8) + for i := 0; i < n; i++ { + rand.Read(spanID) + + copy := append([]byte(nil), spanID...) + IDs = append(IDs, copy) + + tokens[tokenForID(h, buf, 0, spanID)] = struct{}{} + } + + // Ensure no duplicate span IDs accidentally generated + sort.Slice(IDs, func(i, j int) bool { + return bytes.Compare(IDs[i], IDs[j]) == -1 + }) + for i := 1; i < len(IDs); i++ { + if bytes.Equal(IDs[i-1], IDs[i]) { + panic("same span ID was generated, oops") + } + } + + missing := n - len(tokens) + if missing > 0 { + fmt.Printf("missing 1 out of every %.2f spans", float32(n)/float32(missing)) } + + require.Equal(t, n, len(tokens)) } func BenchmarkTokenForID(b *testing.B) { - h := fnv.New32() + h := newHash() id := []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08} buffer := make([]byte, 4) @@ -55,3 +109,55 @@ func BenchmarkTokenForID(b *testing.B) { _ = tokenForID(h, buffer, 0, id) } } + +func BenchmarkCombine(b *testing.B) { + parts := []int{2, 3, 4, 8} + requests := 100 + spansEach := 1000 + id := test.ValidTraceID(nil) + + methods := []struct { + name string + method func(traces []*tempopb.Trace) int + }{ + { + "CombineTraceProtos", + func(traces []*tempopb.Trace) int { + var tr *tempopb.Trace + var spanCount int + for _, t := range traces { + tr, spanCount = CombineTraceProtos(tr, t) + } + return spanCount + }}, + { + "Combiner", + func(traces []*tempopb.Trace) int { + c := NewCombiner() + c.ConsumeAll(traces...) + _, spanCount := c.Result() + return spanCount + }}, + } + for _, p := range parts { + b.Run(strconv.Itoa(p), func(b *testing.B) { + for _, m := range methods { + b.Run(m.name, func(b *testing.B) { + for n := 0; n < b.N; n++ { + + // Generate input data. Since combination is destructive + // this must be done each time. + b.StopTimer() + var traces []*tempopb.Trace + for i := 0; i < p; i++ { + traces = append(traces, test.MakeTraceWithSpanCount(requests, spansEach, id)) + } + b.StartTimer() + + m.method(traces) + } + }) + } + }) + } +} diff --git a/pkg/model/v1/object_decoder.go b/pkg/model/v1/object_decoder.go index 425a8c64a1e..2cc0e62e8e9 100644 --- a/pkg/model/v1/object_decoder.go +++ b/pkg/model/v1/object_decoder.go @@ -50,15 +50,16 @@ func (d *ObjectDecoder) Matches(id []byte, obj []byte, req *tempopb.SearchReques } func (d *ObjectDecoder) Combine(objs ...[]byte) ([]byte, error) { - var combinedTrace *tempopb.Trace + c := trace.NewCombiner() for _, obj := range objs { t, err := staticDecoder.PrepareForRead(obj) if err != nil { return nil, fmt.Errorf("error unmarshaling trace: %w", err) } - combinedTrace, _ = trace.CombineTraceProtos(combinedTrace, t) + c.Consume(t) } + combinedTrace, _ := c.Result() combinedBytes, err := d.Marshal(combinedTrace) if err != nil { diff --git a/pkg/model/v2/object_decoder.go b/pkg/model/v2/object_decoder.go index 4241e72ec78..55e45ac8f0a 100644 --- a/pkg/model/v2/object_decoder.go +++ b/pkg/model/v2/object_decoder.go @@ -92,7 +92,7 @@ func (d *ObjectDecoder) Combine(objs ...[]byte) ([]byte, error) { var minStart, maxEnd uint32 minStart = math.MaxUint32 - var combinedTrace *tempopb.Trace + c := trace.NewCombiner() for _, obj := range objs { t, err := d.PrepareForRead(obj) if err != nil { @@ -113,9 +113,11 @@ func (d *ObjectDecoder) Combine(objs ...[]byte) ([]byte, error) { } } - combinedTrace, _ = trace.CombineTraceProtos(combinedTrace, t) + c.Consume(t) } + combinedTrace, _ := c.Result() + traceBytes := &tempopb.TraceBytes{} bytes, err := proto.Marshal(combinedTrace) if err != nil { From 2a2bda50d50e2cb8a51e2a5f07b16285e7cf71fb Mon Sep 17 00:00:00 2001 From: Martin Disibio Date: Thu, 17 Feb 2022 10:56:09 -0500 Subject: [PATCH 2/6] Migrate all CombineTraceProtos to Combiner --- cmd/tempo-cli/cmd-query-blocks.go | 11 +++++---- modules/frontend/tracebyidsharding.go | 6 +++-- modules/querier/querier.go | 14 ++++++++---- pkg/model/combine_test.go | 17 -------------- pkg/model/trace/combine.go | 33 +++++++++++++++++++++------ pkg/model/trace/combine_test.go | 11 ++++++--- pkg/model/v1/object_decoder.go | 4 ++-- pkg/model/v1/segment_decoder.go | 8 ++++--- pkg/model/v2/object_decoder.go | 4 ++-- pkg/model/v2/segment_decoder.go | 8 ++++--- 10 files changed, 67 insertions(+), 49 deletions(-) diff --git a/cmd/tempo-cli/cmd-query-blocks.go b/cmd/tempo-cli/cmd-query-blocks.go index 3a44c2ee537..050fc2784b4 100644 --- a/cmd/tempo-cli/cmd-query-blocks.go +++ b/cmd/tempo-cli/cmd-query-blocks.go @@ -47,13 +47,13 @@ func (cmd *queryBlocksCmd) Run(ctx *globalOptions) error { } var ( - combinedTrace *tempopb.Trace - marshaller = new(jsonpb.Marshaler) - jsonBytes = bytes.Buffer{} + combiner = trace.NewCombiner() + marshaller = new(jsonpb.Marshaler) + jsonBytes = bytes.Buffer{} ) fmt.Println() - for _, result := range results { + for i, result := range results { fmt.Println(result.blockID, ":") err := marshaller.Marshal(&jsonBytes, result.trace) @@ -64,9 +64,10 @@ func (cmd *queryBlocksCmd) Run(ctx *globalOptions) error { fmt.Println(jsonBytes.String()) jsonBytes.Reset() - combinedTrace, _ = trace.CombineTraceProtos(result.trace, combinedTrace) + combiner.ConsumeWithFinal(result.trace, i == len(results)-1) } + combinedTrace, _ := combiner.Result() fmt.Println("combined:") err = marshaller.Marshal(&jsonBytes, combinedTrace) if err != nil { diff --git a/modules/frontend/tracebyidsharding.go b/modules/frontend/tracebyidsharding.go index d9ad9ea6e60..9f5fc9cdeb2 100644 --- a/modules/frontend/tracebyidsharding.go +++ b/modules/frontend/tracebyidsharding.go @@ -65,7 +65,8 @@ func (s shardQuery) RoundTrip(r *http.Request) (*http.Response, error) { var overallError error var totalFailedBlocks uint32 - overallTrace := &tempopb.Trace{} + combiner := trace.NewCombiner() + combiner.Consume(&tempopb.Trace{}) // The query path returns a non-nil result even if no inputs (which is different than other paths which return nil for no inputs) statusCode := http.StatusNotFound statusMsg := "trace not found" @@ -139,7 +140,7 @@ func (s shardQuery) RoundTrip(r *http.Request) (*http.Response, error) { // happy path statusCode = http.StatusOK - overallTrace, _ = trace.CombineTraceProtos(overallTrace, traceResp.Trace) + combiner.Consume(traceResp.Trace) }(req) } wg.Wait() @@ -148,6 +149,7 @@ func (s shardQuery) RoundTrip(r *http.Request) (*http.Response, error) { return nil, overallError } + overallTrace, _ := combiner.Result() if overallTrace == nil || statusCode != http.StatusOK { // translate non-404s into 500s. if, for instance, we get a 400 back from an internal component // it means that we created a bad request. 400 should not be propagated back to the user b/c diff --git a/modules/querier/querier.go b/modules/querier/querier.go index 5e8e65af14f..b3194d9c33f 100644 --- a/modules/querier/querier.go +++ b/modules/querier/querier.go @@ -167,7 +167,7 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque span, ctx := opentracing.StartSpanFromContext(ctx, "Querier.FindTraceByID") defer span.Finish() - var completeTrace *tempopb.Trace + combiner := trace.NewCombiner() var spanCount, spanCountTotal, traceCountTotal int if req.QueryMode == QueryModeIngesters || req.QueryMode == QueryModeAll { replicationSet, err := q.ring.GetReplicationSetForOperation(ring.Read) @@ -184,16 +184,18 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque return nil, errors.Wrap(err, "error querying ingesters in Querier.FindTraceByID") } + found := false for _, r := range responses { t := r.response.(*tempopb.TraceByIDResponse).Trace if t != nil { - completeTrace, spanCount = trace.CombineTraceProtos(completeTrace, t) + spanCount = combiner.Consume(t) spanCountTotal += spanCount traceCountTotal++ + found = true } } span.LogFields(ot_log.String("msg", "done searching ingesters"), - ot_log.Bool("found", completeTrace != nil), + ot_log.Bool("found", found), ot_log.Int("combinedSpans", spanCountTotal), ot_log.Int("combinedTraces", traceCountTotal)) } @@ -225,17 +227,19 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque } } - completeTrace, spanCount = trace.CombineTraceProtos(completeTrace, storeTrace) + spanCount = combiner.Consume(storeTrace) spanCountTotal += spanCount traceCountTotal++ span.LogFields(ot_log.String("msg", "combined trace protos from store"), - ot_log.Bool("found", completeTrace != nil), + ot_log.Bool("found", len(partialTraces) > 0), ot_log.Int("combinedSpans", spanCountTotal), ot_log.Int("combinedTraces", traceCountTotal)) } } + completeTrace, _ := combiner.Result() + return &tempopb.TraceByIDResponse{ Trace: completeTrace, Metrics: &tempopb.TraceByIDMetrics{ diff --git a/pkg/model/combine_test.go b/pkg/model/combine_test.go index 211d364f9fc..0af14b44b8b 100644 --- a/pkg/model/combine_test.go +++ b/pkg/model/combine_test.go @@ -1,7 +1,6 @@ package model import ( - "fmt" "math/rand" "testing" @@ -102,22 +101,6 @@ func TestCombine(t *testing.T) { } } -func BenchmarkCombineTraceProtos(b *testing.B) { - sizes := []int{1, 10, 1000, 10000, 100000} - - for _, size := range sizes { - b.Run(fmt.Sprint(size), func(b *testing.B) { - t1 := test.MakeTraceWithSpanCount(1, size, []byte{0x01, 0x02}) - t2 := test.MakeTraceWithSpanCount(1, size, []byte{0x01, 0x03}) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - trace.CombineTraceProtos(t1, t2) - } - }) - } -} - func mustMarshalToObject(trace *tempopb.Trace, encoding string) []byte { return mustMarshalToObjectWithRange(trace, encoding, 0, 0) } diff --git a/pkg/model/trace/combine.go b/pkg/model/trace/combine.go index e6da1f33163..774ed7629e3 100644 --- a/pkg/model/trace/combine.go +++ b/pkg/model/trace/combine.go @@ -8,6 +8,7 @@ import ( "github.com/grafana/tempo/pkg/tempopb" ) +// Deprecated: This is only capable of pairwise combination. It is replaced by Combiner. // CombineTraceProtos combines two trace protos into one. Note that it is destructive. // All spans are combined into traceA. spanCountA, B, and Total are returned for // logging purposes. @@ -70,13 +71,17 @@ func CombineTraceProtos(traceA, traceB *tempopb.Trace) (*tempopb.Trace, int) { return traceA, spanCountTotal } +// token is unint64 to reduce hash collision rates. Experimentally, it was observed +// that fnv32 could approach a collision rate of 1 in 10,000. fnv64 avoids collisions +// when tested against traces with up to 1M spans (see matching test). A collision +// results in a dropped span during combine. type token uint64 func newHash() hash.Hash64 { return fnv.New64() } -// tokenForID returns a uint32 token for use in a hash map given a span id and span kind +// tokenForID returns a token for use in a hash map given a span id and span kind // buffer must be a 4 byte slice and is reused for writing the span kind to the hashing function // kind is used along with the actual id b/c in zipkin traces span id is not guaranteed to be unique // as it is shared between client and server spans. @@ -89,6 +94,11 @@ func tokenForID(h hash.Hash64, buffer []byte, kind int32, b []byte) token { return token(h.Sum64()) } +// Combiner combines multiple partial traces into one, deduping spans based on +// ID and kind. Note that it is destructive. There are several efficiency +// improvements over the previous pairwise CombineTraceProtos: +// * Only scan/hash the spans for each input once +// * Only sort the final result once. type Combiner struct { result *tempopb.Trace spans map[token]struct{} @@ -101,13 +111,14 @@ func NewCombiner() *Combiner { } } -func (c *Combiner) ConsumeAll(traces ...*tempopb.Trace) { - for _, t := range traces { - c.Consume(t) - } +// Consume the given trace and destructively combines its contents. +func (c *Combiner) Consume(tr *tempopb.Trace) (spanCount int) { + return c.ConsumeWithFinal(tr, false) } -func (c *Combiner) Consume(tr *tempopb.Trace) { +// ConsumeWithFinal consumes the trace, but allows for performance savings when +// it is known that this is the last expected input trace. +func (c *Combiner) ConsumeWithFinal(tr *tempopb.Trace, final bool) (spanCount int) { if tr == nil { return } @@ -140,12 +151,18 @@ func (c *Combiner) Consume(tr *tempopb.Trace) { _, ok := c.spans[token] if !ok { notFoundSpans = append(notFoundSpans, s) - c.spans[token] = struct{}{} + + // If last expected input, then we don't need to record + // the visited spans. Optimization has significant savings. + if !final { + c.spans[token] = struct{}{} + } } } if len(notFoundSpans) > 0 { ils.Spans = notFoundSpans + spanCount += len(notFoundSpans) notFoundILS = append(notFoundILS, ils) } } @@ -158,8 +175,10 @@ func (c *Combiner) Consume(tr *tempopb.Trace) { } c.combined = true + return } +// Result returns the final trace and span count. func (c *Combiner) Result() (*tempopb.Trace, int) { spanCount := -1 diff --git a/pkg/model/trace/combine_test.go b/pkg/model/trace/combine_test.go index 5f79e2ca7fa..22378ae7f56 100644 --- a/pkg/model/trace/combine_test.go +++ b/pkg/model/trace/combine_test.go @@ -64,7 +64,9 @@ func TestCombineProtoTotals(t *testing.T) { func TestTokenForIDCollision(t *testing.T) { - n := 100_000_0 + // Estimate the hash collision rate of tokenForID. + + n := 1_000_000 h := newHash() buf := make([]byte, 4) @@ -96,6 +98,7 @@ func TestTokenForIDCollision(t *testing.T) { fmt.Printf("missing 1 out of every %.2f spans", float32(n)/float32(missing)) } + // There shouldn't be any collisions. require.Equal(t, n, len(tokens)) } @@ -112,7 +115,7 @@ func BenchmarkTokenForID(b *testing.B) { func BenchmarkCombine(b *testing.B) { parts := []int{2, 3, 4, 8} - requests := 100 + requests := 100 // 100K spans per part spansEach := 1000 id := test.ValidTraceID(nil) @@ -134,7 +137,9 @@ func BenchmarkCombine(b *testing.B) { "Combiner", func(traces []*tempopb.Trace) int { c := NewCombiner() - c.ConsumeAll(traces...) + for i := range traces { + c.ConsumeWithFinal(traces[i], i == len(traces)-1) + } _, spanCount := c.Result() return spanCount }}, diff --git a/pkg/model/v1/object_decoder.go b/pkg/model/v1/object_decoder.go index 2cc0e62e8e9..d9a7a84ea4e 100644 --- a/pkg/model/v1/object_decoder.go +++ b/pkg/model/v1/object_decoder.go @@ -51,13 +51,13 @@ func (d *ObjectDecoder) Matches(id []byte, obj []byte, req *tempopb.SearchReques func (d *ObjectDecoder) Combine(objs ...[]byte) ([]byte, error) { c := trace.NewCombiner() - for _, obj := range objs { + for i, obj := range objs { t, err := staticDecoder.PrepareForRead(obj) if err != nil { return nil, fmt.Errorf("error unmarshaling trace: %w", err) } - c.Consume(t) + c.ConsumeWithFinal(t, i == len(obj)-1) } combinedTrace, _ := c.Result() diff --git a/pkg/model/v1/segment_decoder.go b/pkg/model/v1/segment_decoder.go index 7950847258a..ae3ff8869cf 100644 --- a/pkg/model/v1/segment_decoder.go +++ b/pkg/model/v1/segment_decoder.go @@ -25,17 +25,19 @@ func (d *SegmentDecoder) PrepareForWrite(trace *tempopb.Trace, start uint32, end func (d *SegmentDecoder) PrepareForRead(segments [][]byte) (*tempopb.Trace, error) { // each slice is a marshalled tempopb.Trace, unmarshal and combine - var combinedTrace *tempopb.Trace - for _, s := range segments { + combiner := trace.NewCombiner() + for i, s := range segments { t := &tempopb.Trace{} err := proto.Unmarshal(s, t) if err != nil { return nil, fmt.Errorf("error unmarshaling trace: %w", err) } - combinedTrace, _ = trace.CombineTraceProtos(combinedTrace, t) + combiner.ConsumeWithFinal(t, i == len(segments)-1) } + combinedTrace, _ := combiner.Result() + return combinedTrace, nil } diff --git a/pkg/model/v2/object_decoder.go b/pkg/model/v2/object_decoder.go index 55e45ac8f0a..97539740d83 100644 --- a/pkg/model/v2/object_decoder.go +++ b/pkg/model/v2/object_decoder.go @@ -93,7 +93,7 @@ func (d *ObjectDecoder) Combine(objs ...[]byte) ([]byte, error) { minStart = math.MaxUint32 c := trace.NewCombiner() - for _, obj := range objs { + for i, obj := range objs { t, err := d.PrepareForRead(obj) if err != nil { return nil, fmt.Errorf("error unmarshaling trace: %w", err) @@ -113,7 +113,7 @@ func (d *ObjectDecoder) Combine(objs ...[]byte) ([]byte, error) { } } - c.Consume(t) + c.ConsumeWithFinal(t, i == len(objs)-1) } combinedTrace, _ := c.Result() diff --git a/pkg/model/v2/segment_decoder.go b/pkg/model/v2/segment_decoder.go index 352c4e2d0bc..b85f4bc14ee 100644 --- a/pkg/model/v2/segment_decoder.go +++ b/pkg/model/v2/segment_decoder.go @@ -29,8 +29,8 @@ func (d *SegmentDecoder) PrepareForWrite(trace *tempopb.Trace, start uint32, end } func (d *SegmentDecoder) PrepareForRead(segments [][]byte) (*tempopb.Trace, error) { - var combinedTrace *tempopb.Trace - for _, obj := range segments { + combiner := trace.NewCombiner() + for i, obj := range segments { obj, _, _, err := stripStartEnd(obj) if err != nil { return nil, fmt.Errorf("error stripping start/end: %w", err) @@ -42,9 +42,11 @@ func (d *SegmentDecoder) PrepareForRead(segments [][]byte) (*tempopb.Trace, erro return nil, fmt.Errorf("error unmarshaling trace: %w", err) } - combinedTrace, _ = trace.CombineTraceProtos(combinedTrace, t) + combiner.ConsumeWithFinal(t, i == len(segments)-1) } + combinedTrace, _ := combiner.Result() + return combinedTrace, nil } From 6e9e0852618d45fe87fbb0e3b11f5d11808bd5de Mon Sep 17 00:00:00 2001 From: Martin Disibio Date: Thu, 17 Feb 2022 13:00:26 -0500 Subject: [PATCH 3/6] lint --- pkg/model/trace/combine.go | 1 + pkg/model/trace/combine_test.go | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/model/trace/combine.go b/pkg/model/trace/combine.go index 774ed7629e3..31e9e0629fd 100644 --- a/pkg/model/trace/combine.go +++ b/pkg/model/trace/combine.go @@ -8,6 +8,7 @@ import ( "github.com/grafana/tempo/pkg/tempopb" ) +// nolint: staticcheck // Deprecated: This is only capable of pairwise combination. It is replaced by Combiner. // CombineTraceProtos combines two trace protos into one. Note that it is destructive. // All spans are combined into traceA. spanCountA, B, and Total are returned for diff --git a/pkg/model/trace/combine_test.go b/pkg/model/trace/combine_test.go index 22378ae7f56..77bfdc5a09c 100644 --- a/pkg/model/trace/combine_test.go +++ b/pkg/model/trace/combine_test.go @@ -75,7 +75,8 @@ func TestTokenForIDCollision(t *testing.T) { spanID := make([]byte, 8) for i := 0; i < n; i++ { - rand.Read(spanID) + _, err := rand.Read(spanID) + require.NoError(t, err) copy := append([]byte(nil), spanID...) IDs = append(IDs, copy) From 2458b91d20dd88b755eb9e3ff6e7e032ad5f9eb9 Mon Sep 17 00:00:00 2001 From: Martin Disibio Date: Fri, 18 Feb 2022 15:07:25 -0500 Subject: [PATCH 4/6] Review feedback, delete CombineTraceProtos --- pkg/model/combine.go | 5 ++- pkg/model/trace/combine.go | 75 +++------------------------------ pkg/model/trace/combine_test.go | 16 ++----- 3 files changed, 13 insertions(+), 83 deletions(-) diff --git a/pkg/model/combine.go b/pkg/model/combine.go index 2bfa41a536f..46fb7779282 100644 --- a/pkg/model/combine.go +++ b/pkg/model/combine.go @@ -63,7 +63,10 @@ func CombineForRead(obj []byte, dataEncoding string, t *tempopb.Trace) (*tempopb return nil, fmt.Errorf("error unmarshalling obj (%s): %w", dataEncoding, err) } - combined, _ := trace.CombineTraceProtos(objTrace, t) + c := trace.NewCombiner() + c.Consume(objTrace) + c.ConsumeWithFinal(t, true) + combined, _ := c.Result() return combined, nil } diff --git a/pkg/model/trace/combine.go b/pkg/model/trace/combine.go index 31e9e0629fd..32738d021ee 100644 --- a/pkg/model/trace/combine.go +++ b/pkg/model/trace/combine.go @@ -8,71 +8,7 @@ import ( "github.com/grafana/tempo/pkg/tempopb" ) -// nolint: staticcheck -// Deprecated: This is only capable of pairwise combination. It is replaced by Combiner. -// CombineTraceProtos combines two trace protos into one. Note that it is destructive. -// All spans are combined into traceA. spanCountA, B, and Total are returned for -// logging purposes. -func CombineTraceProtos(traceA, traceB *tempopb.Trace) (*tempopb.Trace, int) { - // if one or the other is nil just return 0 for the one that's nil and -1 for the other. this will be a clear indication this - // code path was taken without unnecessarily counting spans - if traceA == nil { - return traceB, -1 - } - - if traceB == nil { - return traceA, -1 - } - - spanCountTotal := 0 - - h := newHash() - buffer := make([]byte, 4) - - spansInA := make(map[token]struct{}) - for _, batchA := range traceA.Batches { - for _, ilsA := range batchA.InstrumentationLibrarySpans { - for _, spanA := range ilsA.Spans { - spansInA[tokenForID(h, buffer, int32(spanA.Kind), spanA.SpanId)] = struct{}{} - } - spanCountTotal += len(ilsA.Spans) - } - } - - // loop through every span and copy spans in B that don't exist to A - for _, batchB := range traceB.Batches { - notFoundILS := batchB.InstrumentationLibrarySpans[:0] - - for _, ilsB := range batchB.InstrumentationLibrarySpans { - notFoundSpans := ilsB.Spans[:0] - for _, spanB := range ilsB.Spans { - // if found in A, remove from the batch - _, ok := spansInA[tokenForID(h, buffer, int32(spanB.Kind), spanB.SpanId)] - if !ok { - notFoundSpans = append(notFoundSpans, spanB) - } - } - - if len(notFoundSpans) > 0 { - spanCountTotal += len(notFoundSpans) - ilsB.Spans = notFoundSpans - notFoundILS = append(notFoundILS, ilsB) - } - } - - // if there were some spans not found in A, add everything left in the batch - if len(notFoundILS) > 0 { - batchB.InstrumentationLibrarySpans = notFoundILS - traceA.Batches = append(traceA.Batches, batchB) - } - } - - SortTrace(traceA) - - return traceA, spanCountTotal -} - -// token is unint64 to reduce hash collision rates. Experimentally, it was observed +// token is uint64 to reduce hash collision rates. Experimentally, it was observed // that fnv32 could approach a collision rate of 1 in 10,000. fnv64 avoids collisions // when tested against traces with up to 1M spans (see matching test). A collision // results in a dropped span during combine. @@ -96,10 +32,11 @@ func tokenForID(h hash.Hash64, buffer []byte, kind int32, b []byte) token { } // Combiner combines multiple partial traces into one, deduping spans based on -// ID and kind. Note that it is destructive. There are several efficiency -// improvements over the previous pairwise CombineTraceProtos: -// * Only scan/hash the spans for each input once -// * Only sort the final result once. +// ID and kind. Note that it is destructive. There are design decisions for +// efficiency: +// * Only scan/hash the spans for each input once, which is reused across calls. +// * Only sort the final result once and if needed. +// * Don't scan/hash the spans for the last input (final=true). type Combiner struct { result *tempopb.Trace spans map[token]struct{} diff --git a/pkg/model/trace/combine_test.go b/pkg/model/trace/combine_test.go index 77bfdc5a09c..4cf1b2b4c3b 100644 --- a/pkg/model/trace/combine_test.go +++ b/pkg/model/trace/combine_test.go @@ -2,8 +2,8 @@ package trace import ( "bytes" - "crypto/rand" "fmt" + "math/rand" "sort" "strconv" "testing" @@ -17,7 +17,6 @@ import ( func TestCombineProtoTotals(t *testing.T) { methods := []func(a, b *tempopb.Trace) (*tempopb.Trace, int){ - CombineTraceProtos, func(a, b *tempopb.Trace) (*tempopb.Trace, int) { c := NewCombiner() c.Consume(a) @@ -124,16 +123,6 @@ func BenchmarkCombine(b *testing.B) { name string method func(traces []*tempopb.Trace) int }{ - { - "CombineTraceProtos", - func(traces []*tempopb.Trace) int { - var tr *tempopb.Trace - var spanCount int - for _, t := range traces { - tr, spanCount = CombineTraceProtos(tr, t) - } - return spanCount - }}, { "Combiner", func(traces []*tempopb.Trace) int { @@ -143,7 +132,8 @@ func BenchmarkCombine(b *testing.B) { } _, spanCount := c.Result() return spanCount - }}, + }, + }, } for _, p := range parts { b.Run(strconv.Itoa(p), func(b *testing.B) { From 1695a5096a7f18352d0e47a897633917a02ee454 Mon Sep 17 00:00:00 2001 From: Martin Disibio Date: Wed, 23 Feb 2022 16:51:06 -0500 Subject: [PATCH 5/6] Wait until first call to alloc span map, using actual input size --- pkg/model/trace/combine.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/pkg/model/trace/combine.go b/pkg/model/trace/combine.go index 32738d021ee..0d28f7b0954 100644 --- a/pkg/model/trace/combine.go +++ b/pkg/model/trace/combine.go @@ -44,9 +44,7 @@ type Combiner struct { } func NewCombiner() *Combiner { - return &Combiner{ - spans: map[token]struct{}{}, - } + return &Combiner{} } // Consume the given trace and destructively combines its contents. @@ -67,6 +65,17 @@ func (c *Combiner) ConsumeWithFinal(tr *tempopb.Trace, final bool) (spanCount in // First call? if c.result == nil { c.result = tr + + // Pre-alloc map with input size. This saves having to grow the + // map from the small starting size. + n := 0 + for _, b := range c.result.Batches { + for _, ils := range b.InstrumentationLibrarySpans { + n += len(ils.Spans) + } + } + c.spans = make(map[token]struct{}, n) + for _, b := range c.result.Batches { for _, ils := range b.InstrumentationLibrarySpans { for _, s := range ils.Spans { From e4cf0cf3fbd4d9985f7e35ed82ec3edeffed88ac Mon Sep 17 00:00:00 2001 From: Martin Disibio Date: Thu, 24 Feb 2022 10:28:38 -0500 Subject: [PATCH 6/6] changelog --- CHANGELOG.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bcc869640c9..e5f72ed3d57 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,8 @@ distributors. Also, during this period, the ingesters will use considerably more resources and as such should be scaled up (or incoming traffic should be heavily throttled). Once all distributors and ingesters have rolled performance will return to normal. Internally we have observed ~1.5x CPU load on the ingesters during the rollout. [#1227](https://github.com/grafana/tempo/pull/1227) (@joe-elliott) -* [ENHACEMENT] Enterprise jsonnet: add config to create tokengen job explicitly [#1256](https://github.com/grafana/tempo/pull/1256) (@kvrhdn) +* [ENHANCEMENT] Enterprise jsonnet: add config to create tokengen job explicitly [#1256](https://github.com/grafana/tempo/pull/1256) (@kvrhdn) +* [ENHANCEMENT] Make trace combination/compaction more efficient [#1291](https://github.com/grafana/tempo/pull/1291) (@mdisibio) * [BUGFIX]: Remove unnecessary PersistentVolumeClaim [#1245](https://github.com/grafana/tempo/issues/1245) * [BUGFIX] Fixed issue when query-frontend doesn't log request details when request is cancelled [#1136](https://github.com/grafana/tempo/issues/1136) (@adityapwr)