Skip to content

Commit

Permalink
Move trace sdk tests from trace_test into trace package (#6400)
Browse files Browse the repository at this point in the history
I would like to be able to use a private option in
#6393 in tests,
and decided to split this refactoring out into its own PR.

This moves the batch span processor benchmarks into benchmark_test.go,
and replaces one instance of the tracetest.NewInMemoryExporter with a
different test exporter implementation. It then moves most unit tests
from `trace_test` to the main `trace` package.
  • Loading branch information
dashpole authored Mar 5, 2025
1 parent 38f4f39 commit e2aee3a
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 171 deletions.
171 changes: 53 additions & 118 deletions sdk/trace/batch_span_processor_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package trace_test
package trace

import (
"context"
Expand All @@ -12,22 +12,17 @@ import (
"testing"
"time"

"github.com/go-logr/logr"
"github.com/go-logr/logr/funcr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/sdk/internal/env"
ottest "go.opentelemetry.io/otel/sdk/internal/internaltest"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"go.opentelemetry.io/otel/trace"
)

type testBatchExporter struct {
mu sync.Mutex
spans []sdktrace.ReadOnlySpan
spans []ReadOnlySpan
sizes []int
batchCount int
shutdownCount int
Expand All @@ -37,7 +32,7 @@ type testBatchExporter struct {
err error
}

func (t *testBatchExporter) ExportSpans(ctx context.Context, spans []sdktrace.ReadOnlySpan) error {
func (t *testBatchExporter) ExportSpans(ctx context.Context, spans []ReadOnlySpan) error {
t.mu.Lock()
defer t.mu.Unlock()

Expand Down Expand Up @@ -78,20 +73,20 @@ func (t *testBatchExporter) getBatchCount() int {
return t.batchCount
}

var _ sdktrace.SpanExporter = (*testBatchExporter)(nil)
var _ SpanExporter = (*testBatchExporter)(nil)

func TestNewBatchSpanProcessorWithNilExporter(t *testing.T) {
tp := basicTracerProvider(t)
bsp := sdktrace.NewBatchSpanProcessor(nil)
bsp := NewBatchSpanProcessor(nil)
tp.RegisterSpanProcessor(bsp)
tr := tp.Tracer("NilExporter")

_, span := tr.Start(context.Background(), "foo")
span.End()

// These should not panic.
bsp.OnStart(context.Background(), span.(sdktrace.ReadWriteSpan))
bsp.OnEnd(span.(sdktrace.ReadOnlySpan))
bsp.OnStart(context.Background(), span.(ReadWriteSpan))
bsp.OnEnd(span.(ReadOnlySpan))
if err := bsp.ForceFlush(context.Background()); err != nil {
t.Errorf("failed to ForceFlush the BatchSpanProcessor: %v", err)
}
Expand All @@ -102,7 +97,7 @@ func TestNewBatchSpanProcessorWithNilExporter(t *testing.T) {

type testOption struct {
name string
o []sdktrace.BatchSpanProcessorOption
o []BatchSpanProcessorOption
wantNumSpans int
wantBatchCount int
genNumSpans int
Expand All @@ -121,50 +116,50 @@ func TestNewBatchSpanProcessorWithOptions(t *testing.T) {
},
{
name: "non-default BatchTimeout",
o: []sdktrace.BatchSpanProcessorOption{
sdktrace.WithBatchTimeout(schDelay),
o: []BatchSpanProcessorOption{
WithBatchTimeout(schDelay),
},
wantNumSpans: 2053,
wantBatchCount: 4,
genNumSpans: 2053,
},
{
name: "non-default MaxQueueSize and BatchTimeout",
o: []sdktrace.BatchSpanProcessorOption{
sdktrace.WithBatchTimeout(schDelay),
sdktrace.WithMaxQueueSize(200),
o: []BatchSpanProcessorOption{
WithBatchTimeout(schDelay),
WithMaxQueueSize(200),
},
wantNumSpans: 205,
wantBatchCount: 1,
genNumSpans: 205,
},
{
name: "non-default MaxQueueSize, BatchTimeout and MaxExportBatchSize",
o: []sdktrace.BatchSpanProcessorOption{
sdktrace.WithBatchTimeout(schDelay),
sdktrace.WithMaxQueueSize(205),
sdktrace.WithMaxExportBatchSize(20),
o: []BatchSpanProcessorOption{
WithBatchTimeout(schDelay),
WithMaxQueueSize(205),
WithMaxExportBatchSize(20),
},
wantNumSpans: 210,
wantBatchCount: 11,
genNumSpans: 210,
},
{
name: "blocking option",
o: []sdktrace.BatchSpanProcessorOption{
sdktrace.WithBatchTimeout(schDelay),
sdktrace.WithMaxQueueSize(200),
sdktrace.WithMaxExportBatchSize(20),
o: []BatchSpanProcessorOption{
WithBatchTimeout(schDelay),
WithMaxQueueSize(200),
WithMaxExportBatchSize(20),
},
wantNumSpans: 205,
wantBatchCount: 11,
genNumSpans: 205,
},
{
name: "parallel span generation",
o: []sdktrace.BatchSpanProcessorOption{
sdktrace.WithBatchTimeout(schDelay),
sdktrace.WithMaxQueueSize(200),
o: []BatchSpanProcessorOption{
WithBatchTimeout(schDelay),
WithMaxQueueSize(200),
},
wantNumSpans: 205,
wantBatchCount: 1,
Expand All @@ -173,9 +168,9 @@ func TestNewBatchSpanProcessorWithOptions(t *testing.T) {
},
{
name: "parallel span blocking",
o: []sdktrace.BatchSpanProcessorOption{
sdktrace.WithBatchTimeout(schDelay),
sdktrace.WithMaxExportBatchSize(200),
o: []BatchSpanProcessorOption{
WithBatchTimeout(schDelay),
WithMaxExportBatchSize(200),
},
wantNumSpans: 2000,
wantBatchCount: 10,
Expand Down Expand Up @@ -306,19 +301,19 @@ type stuckExporter struct {
}

// ExportSpans waits for ctx to expire and returns that error.
func (e *stuckExporter) ExportSpans(ctx context.Context, _ []sdktrace.ReadOnlySpan) error {
func (e *stuckExporter) ExportSpans(ctx context.Context, _ []ReadOnlySpan) error {
<-ctx.Done()
e.err = ctx.Err()
return ctx.Err()
}

func TestBatchSpanProcessorExportTimeout(t *testing.T) {
exp := new(stuckExporter)
bsp := sdktrace.NewBatchSpanProcessor(
bsp := NewBatchSpanProcessor(
exp,
// Set a non-zero export timeout so a deadline is set.
sdktrace.WithExportTimeout(1*time.Microsecond),
sdktrace.WithBlocking(),
WithExportTimeout(1*time.Microsecond),
WithBlocking(),
)
tp := basicTracerProvider(t)
tp.RegisterSpanProcessor(bsp)
Expand All @@ -332,10 +327,10 @@ func TestBatchSpanProcessorExportTimeout(t *testing.T) {
}
}

func createAndRegisterBatchSP(option testOption, te *testBatchExporter) sdktrace.SpanProcessor {
func createAndRegisterBatchSP(option testOption, te *testBatchExporter) SpanProcessor {
// Always use blocking queue to avoid flaky tests.
options := append(option.o, sdktrace.WithBlocking())
return sdktrace.NewBatchSpanProcessor(te, options...)
options := append(option.o, WithBlocking())
return NewBatchSpanProcessor(te, options...)
}

func generateSpan(_ *testing.T, tr trace.Tracer, option testOption) {
Expand Down Expand Up @@ -382,7 +377,7 @@ func getSpanContext() trace.SpanContext {

func TestBatchSpanProcessorShutdown(t *testing.T) {
var bp testBatchExporter
bsp := sdktrace.NewBatchSpanProcessor(&bp)
bsp := NewBatchSpanProcessor(&bp)

err := bsp.Shutdown(context.Background())
if err != nil {
Expand All @@ -401,14 +396,14 @@ func TestBatchSpanProcessorShutdown(t *testing.T) {
func TestBatchSpanProcessorPostShutdown(t *testing.T) {
tp := basicTracerProvider(t)
be := testBatchExporter{}
bsp := sdktrace.NewBatchSpanProcessor(&be)
bsp := NewBatchSpanProcessor(&be)

tp.RegisterSpanProcessor(bsp)
tr := tp.Tracer("Normal")

generateSpanParallel(t, tr, testOption{
o: []sdktrace.BatchSpanProcessorOption{
sdktrace.WithMaxExportBatchSize(50),
o: []BatchSpanProcessorOption{
WithMaxExportBatchSize(50),
},
genNumSpans: 60,
})
Expand All @@ -428,9 +423,9 @@ func TestBatchSpanProcessorForceFlushSucceeds(t *testing.T) {
tp := basicTracerProvider(t)
option := testOption{
name: "default BatchSpanProcessorOptions",
o: []sdktrace.BatchSpanProcessorOption{
sdktrace.WithMaxQueueSize(0),
sdktrace.WithMaxExportBatchSize(3000),
o: []BatchSpanProcessorOption{
WithMaxQueueSize(0),
WithMaxExportBatchSize(3000),
},
wantNumSpans: 2053,
wantBatchCount: 1,
Expand Down Expand Up @@ -468,9 +463,9 @@ func TestBatchSpanProcessorDropBatchIfFailed(t *testing.T) {
}
tp := basicTracerProvider(t)
option := testOption{
o: []sdktrace.BatchSpanProcessorOption{
sdktrace.WithMaxQueueSize(0),
sdktrace.WithMaxExportBatchSize(2000),
o: []BatchSpanProcessorOption{
WithMaxQueueSize(0),
WithMaxExportBatchSize(2000),
},
wantNumSpans: 1000,
wantBatchCount: 1,
Expand Down Expand Up @@ -545,7 +540,7 @@ func (e indefiniteExporter) Shutdown(context.Context) error {
return nil
}

func (e indefiniteExporter) ExportSpans(ctx context.Context, _ []sdktrace.ReadOnlySpan) error {
func (e indefiniteExporter) ExportSpans(ctx context.Context, _ []ReadOnlySpan) error {
<-e.stop
return ctx.Err()
}
Expand All @@ -555,7 +550,7 @@ func TestBatchSpanProcessorForceFlushCancellation(t *testing.T) {
// Cancel the context
cancel()

bsp := sdktrace.NewBatchSpanProcessor(newIndefiniteExporter(t))
bsp := NewBatchSpanProcessor(newIndefiniteExporter(t))
t.Cleanup(func() {
assert.NoError(t, bsp.Shutdown(context.Background()))
})
Expand All @@ -568,7 +563,7 @@ func TestBatchSpanProcessorForceFlushCancellation(t *testing.T) {
func TestBatchSpanProcessorForceFlushTimeout(t *testing.T) {
tp := basicTracerProvider(t)
exp := newIndefiniteExporter(t)
bsp := sdktrace.NewBatchSpanProcessor(exp)
bsp := NewBatchSpanProcessor(exp)
tp.RegisterSpanProcessor(bsp)
tr := tp.Tracer(t.Name())
_, span := tr.Start(context.Background(), "foo")
Expand All @@ -586,11 +581,10 @@ func TestBatchSpanProcessorForceFlushTimeout(t *testing.T) {
func TestBatchSpanProcessorForceFlushQueuedSpans(t *testing.T) {
ctx := context.Background()

exp := tracetest.NewInMemoryExporter()

tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exp),
)
var bp testBatchExporter
bsp := NewBatchSpanProcessor(&bp)
tp := basicTracerProvider(t)
tp.RegisterSpanProcessor(bsp)
t.Cleanup(func() {
assert.NoError(t, tp.Shutdown(context.Background()))
})
Expand All @@ -604,14 +598,14 @@ func TestBatchSpanProcessorForceFlushQueuedSpans(t *testing.T) {
err := tp.ForceFlush(ctx)
assert.NoError(t, err)

assert.Len(t, exp.GetSpans(), i+1)
assert.Len(t, bp.spans, i+1)
}
}

func TestBatchSpanProcessorConcurrentSafe(t *testing.T) {
ctx := context.Background()
var bp testBatchExporter
bsp := sdktrace.NewBatchSpanProcessor(&bp)
bsp := NewBatchSpanProcessor(&bp)
tp := basicTracerProvider(t)
tp.RegisterSpanProcessor(bsp)
tr := tp.Tracer(t.Name())
Expand Down Expand Up @@ -650,62 +644,3 @@ func TestBatchSpanProcessorConcurrentSafe(t *testing.T) {

wg.Wait()
}

func BenchmarkSpanProcessorOnEnd(b *testing.B) {
for _, bb := range []struct {
batchSize int
spansCount int
}{
{batchSize: 10, spansCount: 10},
{batchSize: 10, spansCount: 100},
{batchSize: 100, spansCount: 10},
{batchSize: 100, spansCount: 100},
} {
b.Run(fmt.Sprintf("batch: %d, spans: %d", bb.batchSize, bb.spansCount), func(b *testing.B) {
bsp := sdktrace.NewBatchSpanProcessor(
tracetest.NewNoopExporter(),
sdktrace.WithMaxExportBatchSize(bb.batchSize),
)
b.Cleanup(func() {
_ = bsp.Shutdown(context.Background())
})
snap := tracetest.SpanStub{}.Snapshot()

b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
// Ensure the export happens for every run
for j := 0; j < bb.spansCount; j++ {
bsp.OnEnd(snap)
}
}
})
}
}

func BenchmarkSpanProcessorVerboseLogging(b *testing.B) {
b.Cleanup(func(l logr.Logger) func() {
return func() { global.SetLogger(l) }
}(global.GetLogger()))
global.SetLogger(funcr.New(func(prefix, args string) {}, funcr.Options{Verbosity: 5}))
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(
tracetest.NewNoopExporter(),
sdktrace.WithMaxExportBatchSize(10),
))
b.Cleanup(func() {
_ = tp.Shutdown(context.Background())
})
tracer := tp.Tracer("bench")
ctx := context.Background()

b.ResetTimer()
b.ReportAllocs()

for i := 0; i < b.N; i++ {
for j := 0; j < 10; j++ {
_, span := tracer.Start(ctx, "bench")
span.End()
}
}
}
Loading

0 comments on commit e2aee3a

Please sign in to comment.