Skip to content

Commit

Permalink
[chore] Remove fake-request duplicate test code (#12113)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored Jan 19, 2025
1 parent 08ca645 commit f07ebc3
Show file tree
Hide file tree
Showing 8 changed files with 307 additions and 397 deletions.
223 changes: 112 additions & 111 deletions exporter/exporterhelper/internal/batch_sender_test.go

Large diffs are not rendered by default.

118 changes: 4 additions & 114 deletions exporter/exporterhelper/internal/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,138 +5,28 @@ package internal // import "go.opentelemetry.io/collector/exporter/exporterhelpe

import (
"context"
"sync/atomic"
"time"

"go.opentelemetry.io/collector/exporter/exporterbatcher"
"go.opentelemetry.io/collector/exporter/internal"
"go.opentelemetry.io/collector/exporter/internal/requesttest"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pprofile"
"go.opentelemetry.io/collector/pdata/ptrace"
)

type fakeRequestSink struct {
requestsCount *atomic.Int64
itemsCount *atomic.Int64
}

func newFakeRequestSink() *fakeRequestSink {
return &fakeRequestSink{
requestsCount: new(atomic.Int64),
itemsCount: new(atomic.Int64),
}
}

type fakeRequest struct {
items int
exportErr error
mergeErr error
delay time.Duration
sink *fakeRequestSink
}

func (r *fakeRequest) Export(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(r.delay):
}
if r.exportErr != nil {
return r.exportErr
}
if r.sink != nil {
r.sink.requestsCount.Add(1)
r.sink.itemsCount.Add(int64(r.items))
}
return nil
}

func (r *fakeRequest) ItemsCount() int {
return r.items
}

func (r *fakeRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 internal.Request) ([]internal.Request, error) {
if r.mergeErr != nil {
return nil, r.mergeErr
}

maxItems := cfg.MaxSizeItems
if maxItems == 0 {
fr2 := r2.(*fakeRequest)
if fr2.mergeErr != nil {
return nil, fr2.mergeErr
}
return []internal.Request{
&fakeRequest{
items: r.items + fr2.items,
sink: r.sink,
exportErr: fr2.exportErr,
delay: r.delay + fr2.delay,
},
}, nil
}

var fr2 *fakeRequest
if r2 == nil {
fr2 = &fakeRequest{sink: r.sink, exportErr: r.exportErr, delay: r.delay}
} else {
if r2.(*fakeRequest).mergeErr != nil {
return nil, r2.(*fakeRequest).mergeErr
}
fr2 = r2.(*fakeRequest)
fr2 = &fakeRequest{items: fr2.items, sink: fr2.sink, exportErr: fr2.exportErr, delay: fr2.delay}
}
var res []internal.Request

// fill fr1 to maxItems if it's not nil

r = &fakeRequest{items: r.items, sink: r.sink, exportErr: r.exportErr, delay: r.delay}
if fr2.items <= maxItems-r.items {
r.items += fr2.items
if fr2.exportErr != nil {
r.exportErr = fr2.exportErr
}
return []internal.Request{r}, nil
}
// if split is needed, we don't propagate exportErr from fr2 to fr1 to test more cases
fr2.items -= maxItems - r.items
r.items = maxItems
res = append(res, r)

// split fr2 to maxItems
for {
if fr2.items <= maxItems {
res = append(res, &fakeRequest{items: fr2.items, sink: fr2.sink, exportErr: fr2.exportErr, delay: fr2.delay})
break
}
res = append(res, &fakeRequest{items: maxItems, sink: fr2.sink, exportErr: fr2.exportErr, delay: fr2.delay})
fr2.items -= maxItems
}

return res, nil
}

func RequestFromMetricsFunc(reqErr error) func(context.Context, pmetric.Metrics) (internal.Request, error) {
return func(_ context.Context, md pmetric.Metrics) (internal.Request, error) {
return &fakeRequest{items: md.DataPointCount(), exportErr: reqErr}, nil
return &requesttest.FakeRequest{Items: md.DataPointCount(), ExportErr: reqErr}, nil
}
}

func RequestFromTracesFunc(reqErr error) func(context.Context, ptrace.Traces) (internal.Request, error) {
return func(_ context.Context, td ptrace.Traces) (internal.Request, error) {
return &fakeRequest{items: td.SpanCount(), exportErr: reqErr}, nil
return &requesttest.FakeRequest{Items: td.SpanCount(), ExportErr: reqErr}, nil
}
}

func RequestFromLogsFunc(reqErr error) func(context.Context, plog.Logs) (internal.Request, error) {
return func(_ context.Context, ld plog.Logs) (internal.Request, error) {
return &fakeRequest{items: ld.LogRecordCount(), exportErr: reqErr}, nil
}
}

func RequestFromProfilesFunc(reqErr error) func(context.Context, pprofile.Profiles) (internal.Request, error) {
return func(_ context.Context, pd pprofile.Profiles) (internal.Request, error) {
return &fakeRequest{items: pd.SampleCount(), exportErr: reqErr}, nil
return &requesttest.FakeRequest{Items: ld.LogRecordCount(), ExportErr: reqErr}, nil
}
}
23 changes: 15 additions & 8 deletions exporter/exporterhelper/xexporterhelper/profiles_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/exporter/internal/requesttest"
"go.opentelemetry.io/collector/exporter/internal/storagetest"
"go.opentelemetry.io/collector/exporter/xexporter"
"go.opentelemetry.io/collector/pdata/pprofile"
Expand Down Expand Up @@ -66,7 +67,7 @@ func TestProfilesExporter_NilLogger(t *testing.T) {
}

func TestProfilesRequestExporter_NilLogger(t *testing.T) {
le, err := NewProfilesRequestExporter(context.Background(), exporter.Settings{}, internal.RequestFromProfilesFunc(nil))
le, err := NewProfilesRequestExporter(context.Background(), exporter.Settings{}, requestFromProfilesFunc(nil))
require.Nil(t, le)
require.Equal(t, errNilLogger, err)
}
Expand Down Expand Up @@ -98,7 +99,7 @@ func TestProfilesExporter_Default(t *testing.T) {
func TestProfilesRequestExporter_Default(t *testing.T) {
ld := pprofile.NewProfiles()
le, err := NewProfilesRequestExporter(context.Background(), exportertest.NewNopSettings(),
internal.RequestFromProfilesFunc(nil))
requestFromProfilesFunc(nil))
assert.NotNil(t, le)
require.NoError(t, err)

Expand All @@ -120,7 +121,7 @@ func TestProfilesExporter_WithCapabilities(t *testing.T) {
func TestProfilesRequestExporter_WithCapabilities(t *testing.T) {
capabilities := consumer.Capabilities{MutatesData: true}
le, err := NewProfilesRequestExporter(context.Background(), exportertest.NewNopSettings(),
internal.RequestFromProfilesFunc(nil), exporterhelper.WithCapabilities(capabilities))
requestFromProfilesFunc(nil), exporterhelper.WithCapabilities(capabilities))
require.NoError(t, err)
require.NotNil(t, le)

Expand Down Expand Up @@ -152,7 +153,7 @@ func TestProfilesRequestExporter_Default_ExportError(t *testing.T) {
ld := pprofile.NewProfiles()
want := errors.New("export_error")
le, err := NewProfilesRequestExporter(context.Background(), exportertest.NewNopSettings(),
internal.RequestFromProfilesFunc(want))
requestFromProfilesFunc(want))
require.NoError(t, err)
require.NotNil(t, le)
require.Equal(t, want, le.ConsumeProfiles(context.Background(), ld))
Expand Down Expand Up @@ -202,7 +203,7 @@ func TestProfilesRequestExporter_WithSpan(t *testing.T) {
otel.SetTracerProvider(set.TracerProvider)
defer otel.SetTracerProvider(nooptrace.NewTracerProvider())

le, err := NewProfilesRequestExporter(context.Background(), set, internal.RequestFromProfilesFunc(nil))
le, err := NewProfilesRequestExporter(context.Background(), set, requestFromProfilesFunc(nil))
require.NoError(t, err)
require.NotNil(t, le)
checkWrapSpanForProfilesExporter(t, sr, set.TracerProvider.Tracer("test"), le, nil)
Expand Down Expand Up @@ -230,7 +231,7 @@ func TestProfilesRequestExporter_WithSpan_ReturnError(t *testing.T) {
defer otel.SetTracerProvider(nooptrace.NewTracerProvider())

want := errors.New("my_error")
le, err := NewProfilesRequestExporter(context.Background(), set, internal.RequestFromProfilesFunc(want))
le, err := NewProfilesRequestExporter(context.Background(), set, requestFromProfilesFunc(want))
require.NoError(t, err)
require.NotNil(t, le)
checkWrapSpanForProfilesExporter(t, sr, set.TracerProvider.Tracer("test"), le, want)
Expand All @@ -253,7 +254,7 @@ func TestProfilesRequestExporter_WithShutdown(t *testing.T) {
shutdown := func(context.Context) error { shutdownCalled = true; return nil }

le, err := NewProfilesRequestExporter(context.Background(), exportertest.NewNopSettings(),
internal.RequestFromProfilesFunc(nil), exporterhelper.WithShutdown(shutdown))
requestFromProfilesFunc(nil), exporterhelper.WithShutdown(shutdown))
assert.NotNil(t, le)
require.NoError(t, err)

Expand All @@ -277,7 +278,7 @@ func TestProfilesRequestExporter_WithShutdown_ReturnError(t *testing.T) {
shutdownErr := func(context.Context) error { return want }

le, err := NewProfilesRequestExporter(context.Background(), exportertest.NewNopSettings(),
internal.RequestFromProfilesFunc(nil), exporterhelper.WithShutdown(shutdownErr))
requestFromProfilesFunc(nil), exporterhelper.WithShutdown(shutdownErr))
assert.NotNil(t, le)
require.NoError(t, err)

Expand Down Expand Up @@ -323,3 +324,9 @@ func checkWrapSpanForProfilesExporter(t *testing.T, sr *tracetest.SpanRecorder,
require.Containsf(t, sd.Attributes(), attribute.KeyValue{Key: internal.FailedToSendSamplesKey, Value: attribute.Int64Value(failedToSendSampleRecords)}, "SpanData %v", sd)
}
}

func requestFromProfilesFunc(reqErr error) func(context.Context, pprofile.Profiles) (exporterhelper.Request, error) {
return func(_ context.Context, pd pprofile.Profiles) (exporterhelper.Request, error) {
return &requesttest.FakeRequest{Items: pd.SampleCount(), ExportErr: reqErr}, nil
}
}
2 changes: 1 addition & 1 deletion exporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ require (
go.opentelemetry.io/collector/extension/xextension v0.117.0
go.opentelemetry.io/collector/featuregate v1.23.0
go.opentelemetry.io/collector/pdata v1.23.0
go.opentelemetry.io/collector/pdata/pprofile v0.117.0
go.opentelemetry.io/collector/pdata/testdata v0.117.0
go.opentelemetry.io/collector/pipeline v0.117.0
go.opentelemetry.io/otel v1.32.0
Expand All @@ -44,6 +43,7 @@ require (
go.opentelemetry.io/collector/consumer/xconsumer v0.117.0 // indirect
go.opentelemetry.io/collector/exporter/xexporter v0.117.0 // indirect
go.opentelemetry.io/collector/extension v0.117.0 // indirect
go.opentelemetry.io/collector/pdata/pprofile v0.117.0 // indirect
go.opentelemetry.io/collector/receiver v0.117.0 // indirect
go.opentelemetry.io/collector/receiver/receivertest v0.117.0 // indirect
go.opentelemetry.io/collector/receiver/xreceiver v0.117.0 // indirect
Expand Down
Loading

0 comments on commit f07ebc3

Please sign in to comment.