Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make spans fit an envelope size #4022

Merged
merged 4 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions agent/workers/envelope/spans_envelope.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package envelope

import (
agentProto "github.com/kubeshop/tracetest/agent/proto"
"google.golang.org/protobuf/proto"
)

// EnvelopeSpans get a list of spans and batch them into a packet that does not
// surpasses the maxPacketSize restriction. When maxPacketSize is reached, no
// more spans are added to the packet,
func EnvelopeSpans(spans []*agentProto.Span, maxPacketSize int) []*agentProto.Span {
envelope := make([]*agentProto.Span, 0, len(spans))
currentSize := 0

// There's a weird scenario that must be covered here: imagine a span so big it is bigger than maxPacketSize.
// It is impossible to send a span like this, so in this case, we classify those spans as "large spans" and we allow
// `largeSpansPerPacket` per packet.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a great point, I believe there is an issue around this very same subject in one of the otel collector repos, I'm glad you thought about it too!

//
// It is important to ensure a limit of large spans per packet because if your whole trace is composed by
// large spans, this would mean a packet would hold the entiry trace and we don't want that to happen.
const largeSpansPerPacket = 1
numberLargeSpansAddedToPacket := 0

for _, span := range spans {
spanSize := proto.Size(span)
isLargeSpan := spanSize > maxPacketSize
if currentSize+spanSize < maxPacketSize || isLargeSpan {
if isLargeSpan {
if numberLargeSpansAddedToPacket >= largeSpansPerPacket {
// there is already the limit of large spans in the packet, skip this one
continue
}

numberLargeSpansAddedToPacket++
}
envelope = append(envelope, span)
currentSize += spanSize
}
}

return envelope
}
163 changes: 163 additions & 0 deletions agent/workers/envelope/spans_envelope_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package envelope_test

import (
"testing"
"time"

"github.com/kubeshop/tracetest/agent/proto"
"github.com/kubeshop/tracetest/agent/workers/envelope"
"github.com/kubeshop/tracetest/server/pkg/id"
"github.com/stretchr/testify/require"
"gotest.tools/v3/assert"
)

func TestSpanEnvelope(t *testing.T) {
// each of these spans take 73 bytes
spans := []*proto.Span{
createSpan(),
createSpan(),
}

envelope := envelope.EnvelopeSpans(spans, 150)
// 2 * 73 < 150, so all spans should be included in the envelope
require.Len(t, envelope, 2)
}

func TestSpanEnvelopeShouldIgnoreRestOfSpansIfTheyDontFit(t *testing.T) {
// each of these spans take 73 bytes
span1, span2, span3 := createSpan(), createSpan(), createSpan()
spans := []*proto.Span{
span1,
span2,
span3,
}

envelope := envelope.EnvelopeSpans(spans, 150)
// 3 * 73 > 150, but 2 spans fit the envelope, so take 2 spans instead
require.Len(t, envelope, 2)
assert.Equal(t, envelope[0].Id, span1.Id)
assert.Equal(t, envelope[1].Id, span2.Id)
}

func TestSpanEnvelopeShouldFitAsManySpansAsPossible(t *testing.T) {
// these spans take 73 bytes, 73 bytes, and 33 bytes respectively
span1, span2, span3 := createSpan(), createSpan(), createSmallSpan()
spans := []*proto.Span{
span1,
span2,
span3,
}

envelope := envelope.EnvelopeSpans(spans, 110)
// 73+73+33 > 110, 73+73 is also bigger than 110. But we can add 73 (span1) + 33 (span3) to fit the envelope
require.Len(t, envelope, 2)
assert.Equal(t, envelope[0].Id, span1.Id)
assert.Equal(t, envelope[1].Id, span3.Id)
}

func TestSpanEnvelopeShouldAllowLargeSpans(t *testing.T) {
// a large span is 682 bytes long, in theory, it should not fit the envelope, however,
// we should allow 1 per envelope just to make sure ALL spans are sent.
spans := []*proto.Span{
createLargeSpan(),
}

envelope := envelope.EnvelopeSpans(spans, 110)
require.Len(t, envelope, 1)
}

func TestSpanEnvelopeShouldOnlyAllowOneLargeSpan(t *testing.T) {
// a large span is 682 bytes long, in theory, it should not fit the envelope, however,
// we should allow 1 per envelope just to make sure ALL spans are sent.
largeSpan1, largeSpan2 := createLargeSpan(), createLargeSpan()
spans := []*proto.Span{
largeSpan1,
largeSpan2,
}

envelope := envelope.EnvelopeSpans(spans, 110)
require.Len(t, envelope, 1)
assert.Equal(t, largeSpan1.Id, envelope[0].Id)
}

func TestSpanEnvelopeShouldAddALargeSpanEvenIfThereAreMoreSpansInIt(t *testing.T) {
// Given a list of small spans that should be able to fit the envelope, but also a large span that doesn't fit an envelope,
// it should include the largeSpan with the 2 first spans and leave the third small span out
smallSpan1, smallSpan2, largeSpan1, smallSpan3 := createSmallSpan(), createSmallSpan(), createLargeSpan(), createSmallSpan()
spans := []*proto.Span{
smallSpan1,
smallSpan2,
largeSpan1,
smallSpan3,
}

envelope := envelope.EnvelopeSpans(spans, 100)
require.Len(t, envelope, 3)
assert.Equal(t, smallSpan1.Id, envelope[0].Id)
assert.Equal(t, smallSpan2.Id, envelope[1].Id)
assert.Equal(t, largeSpan1.Id, envelope[2].Id)
}

func TestSpanEnvelopeShouldIgnoreExtraLargeSpan(t *testing.T) {
// Given a list of small spans that should be able to fit the envelope, but also a large span that doesn't fit an envelope,
// it should include the largeSpan with the 2 first spans and leave the third small span out
smallSpan1, smallSpan2, largeSpan1, smallSpan3, largeSpan2 := createSmallSpan(), createSmallSpan(), createLargeSpan(), createSmallSpan(), createLargeSpan()
spans := []*proto.Span{
smallSpan1,
smallSpan2,
largeSpan1,
smallSpan3,
largeSpan2,
}

envelope := envelope.EnvelopeSpans(spans, 100)
require.Len(t, envelope, 3)
assert.Equal(t, smallSpan1.Id, envelope[0].Id)
assert.Equal(t, smallSpan2.Id, envelope[1].Id)
assert.Equal(t, largeSpan1.Id, envelope[2].Id)
}

func createSpan() *proto.Span {
return &proto.Span{
Id: id.NewRandGenerator().SpanID().String(),
Name: "span name",
StartTime: time.Now().Unix(),
EndTime: time.Now().Add(2 * time.Second).Unix(),
Kind: "internal",
Attributes: []*proto.KeyValuePair{
{
Key: "service.name",
Value: "core",
},
},
}
}

func createSmallSpan() *proto.Span {
return &proto.Span{
Id: id.NewRandGenerator().SpanID().String(),
Name: "s",
StartTime: time.Now().Unix(),
EndTime: time.Now().Add(2 * time.Second).Unix(),
Kind: "",
Attributes: []*proto.KeyValuePair{},
}
}

func createLargeSpan() *proto.Span {
loremIpsum := "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Vestibulum eu fermentum elit. Ut convallis elit nisl, et porttitor ante dignissim quis. Curabitur porttitor molestie iaculis. Suspendisse potenti. Curabitur sollicitudin finibus mollis. Nunc at tincidunt dolor. Nam eleifend ante in elit vulputate lacinia. Donec sem orci, luctus ut eros id, tincidunt elementum nulla. Nulla et nibh pharetra, pretium odio nec, posuere est. Curabitur a felis ut risus fermentum ornare vitae sed dolor. Mauris non velit at nulla ultricies mattis. "
return &proto.Span{
Id: id.NewRandGenerator().SpanID().String(),
Name: loremIpsum,
StartTime: time.Now().Unix(),
EndTime: time.Now().Add(2 * time.Second).Unix(),
Kind: "internal",
Attributes: []*proto.KeyValuePair{
{Key: "service.name", Value: "core"},
{Key: "service.team", Value: "ranchers"},
{Key: "go.version", Value: "1.22.3"},
{Key: "go.os", Value: "Linux"},
{Key: "go.arch", Value: "amd64"},
},
}
}
5 changes: 4 additions & 1 deletion agent/workers/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/kubeshop/tracetest/agent/telemetry"
"github.com/kubeshop/tracetest/agent/tracedb"
"github.com/kubeshop/tracetest/agent/tracedb/connection"
"github.com/kubeshop/tracetest/agent/workers/envelope"
"github.com/kubeshop/tracetest/agent/workers/poller"
"github.com/kubeshop/tracetest/server/datastore"
"github.com/kubeshop/tracetest/server/executor"
Expand All @@ -24,6 +25,8 @@ import (
"go.uber.org/zap"
)

const maxEnvelopeSize = 3 * 1024 * 1024 // 3MB

type PollerWorker struct {
client *client.Client
inmemoryDatastore tracedb.TraceDB
Expand Down Expand Up @@ -222,7 +225,7 @@ func (w *PollerWorker) poll(ctx context.Context, request *proto.PollingRequest)
w.logger.Debug("Span was already sent", zap.String("runKey", runKey))
}
}
pollingResponse.Spans = newSpans
pollingResponse.Spans = envelope.EnvelopeSpans(newSpans, maxEnvelopeSize)

w.logger.Debug("Filtered spans", zap.Any("pollingResponse", spew.Sdump(pollingResponse)))
}
Expand Down
Loading