From a2242f7c88882bb1bc2c312a23c58aa845e0c041 Mon Sep 17 00:00:00 2001 From: Dmitrii Anoshin Date: Mon, 21 Aug 2023 08:58:55 -0700 Subject: [PATCH] [exporterhelper] New exporter helper for custom requests (#8178) Introduce a new exporter helper that operates over client-provided requests instead of pdata. The helper user now has to provide `Converter` - an interface with a function implementing translation of pdata Metrics/Traces/Logs into a user-defined `Request`. `Request` is an interface with only one required function `Export`. It opens a door for moving batching to the exporter, where batches will be built from client data format, instead of pdata. The batches can be properly sized by custom request size, which can be different from OTLP. The same custom request sizing will be applied to the sending queue. It will also improve the performance of the sending queue retries for non-OTLP exporters, they don't need to translate pdata on every retry. This is an implementation alternative to https://github.com/open-telemetry/opentelemetry-collector/pull/7874 as suggested in https://github.com/open-telemetry/opentelemetry-collector/pull/7874#discussion_r1278444371 Tracking Issue: https://github.com/open-telemetry/opentelemetry-collector/issues/8122 --------- Co-authored-by: Alex Boten --- .chloggen/exporter-helper-v2.yaml | 12 +++ exporter/exporterhelper/common.go | 42 ++++++--- exporter/exporterhelper/common_test.go | 23 ++--- exporter/exporterhelper/constants.go | 6 ++ .../internal/persistent_queue.go | 14 ++- .../internal/persistent_queue_test.go | 10 ++- .../internal/persistent_storage.go | 14 +-- .../internal/persistent_storage_batch.go | 6 +- .../internal/persistent_storage_test.go | 15 +++- exporter/exporterhelper/internal/request.go | 6 +- exporter/exporterhelper/logs.go | 65 ++++++++++++-- exporter/exporterhelper/logs_test.go | 83 +++++++++++++++++- exporter/exporterhelper/metrics.go | 66 ++++++++++++-- exporter/exporterhelper/metrics_test.go | 86 ++++++++++++++++++- exporter/exporterhelper/queued_retry.go | 67 ++++++++------- exporter/exporterhelper/queued_retry_test.go | 53 ++++++++---- exporter/exporterhelper/request.go | 50 +++++++++++ exporter/exporterhelper/request_test.go | 44 ++++++++++ exporter/exporterhelper/traces.go | 62 +++++++++++-- exporter/exporterhelper/traces_test.go | 81 ++++++++++++++++- 20 files changed, 691 insertions(+), 114 deletions(-) create mode 100644 .chloggen/exporter-helper-v2.yaml create mode 100644 exporter/exporterhelper/request.go create mode 100644 exporter/exporterhelper/request_test.go diff --git a/.chloggen/exporter-helper-v2.yaml b/.chloggen/exporter-helper-v2.yaml new file mode 100644 index 00000000000..600a71f1b0c --- /dev/null +++ b/.chloggen/exporter-helper-v2.yaml @@ -0,0 +1,12 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporter/exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Introduce a new exporter helper that operates over client-provided requests instead of pdata + +# One or more tracking issues or pull requests related to the change +issues: [7874] + diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index 08b3e015e41..8077d465741 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -56,32 +56,46 @@ func (req *baseRequest) OnProcessingFinished() { } } +type queueSettings struct { + config QueueSettings + marshaler internal.RequestMarshaler + unmarshaler internal.RequestUnmarshaler +} + +func (qs *queueSettings) persistenceEnabled() bool { + return qs.config.StorageID != nil && qs.marshaler != nil && qs.unmarshaler != nil +} + // baseSettings represents all the options that users can configure. type baseSettings struct { component.StartFunc component.ShutdownFunc consumerOptions []consumer.Option TimeoutSettings - QueueSettings + queueSettings RetrySettings + requestExporter bool } -// fromOptions returns the internal options starting from the default and applying all configured options. -func fromOptions(options ...Option) *baseSettings { - // Start from the default options: - opts := &baseSettings{ +// newBaseSettings returns the baseSettings starting from the default and applying all configured options. +// requestExporter indicates whether the base settings are for a new request exporter or not. +func newBaseSettings(requestExporter bool, options ...Option) *baseSettings { + bs := &baseSettings{ + requestExporter: requestExporter, TimeoutSettings: NewDefaultTimeoutSettings(), // TODO: Enable queuing by default (call DefaultQueueSettings) - QueueSettings: QueueSettings{Enabled: false}, + queueSettings: queueSettings{ + config: QueueSettings{Enabled: false}, + }, // TODO: Enable retry by default (call DefaultRetrySettings) RetrySettings: RetrySettings{Enabled: false}, } for _, op := range options { - op(opts) + op(bs) } - return opts + return bs } // Option apply changes to baseSettings. @@ -121,9 +135,13 @@ func WithRetry(retrySettings RetrySettings) Option { // WithQueue overrides the default QueueSettings for an exporter. // The default QueueSettings is to disable queueing. -func WithQueue(queueSettings QueueSettings) Option { +// This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter. +func WithQueue(config QueueSettings) Option { return func(o *baseSettings) { - o.QueueSettings = queueSettings + if o.requestExporter { + panic("queueing is not available for the new request exporters yet") + } + o.queueSettings.config = config } } @@ -145,7 +163,7 @@ type baseExporter struct { qrSender *queuedRetrySender } -func newBaseExporter(set exporter.CreateSettings, bs *baseSettings, signal component.DataType, reqUnmarshaler internal.RequestUnmarshaler) (*baseExporter, error) { +func newBaseExporter(set exporter.CreateSettings, bs *baseSettings, signal component.DataType) (*baseExporter, error) { be := &baseExporter{} var err error @@ -154,7 +172,7 @@ func newBaseExporter(set exporter.CreateSettings, bs *baseSettings, signal compo return nil, err } - be.qrSender = newQueuedRetrySender(set.ID, signal, bs.QueueSettings, bs.RetrySettings, reqUnmarshaler, &timeoutSender{cfg: bs.TimeoutSettings}, set.Logger) + be.qrSender = newQueuedRetrySender(set.ID, signal, bs.queueSettings, bs.RetrySettings, &timeoutSender{cfg: bs.TimeoutSettings}, set.Logger) be.sender = be.qrSender be.StartFunc = func(ctx context.Context, host component.Host) error { // First start the wrapped exporter. diff --git a/exporter/exporterhelper/common_test.go b/exporter/exporterhelper/common_test.go index 8f5a5376c39..9c5bcbe2513 100644 --- a/exporter/exporterhelper/common_test.go +++ b/exporter/exporterhelper/common_test.go @@ -14,11 +14,8 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal" "go.opentelemetry.io/collector/exporter/exportertest" - "go.opentelemetry.io/collector/pdata/ptrace" ) var ( @@ -35,7 +32,11 @@ var ( ) func TestBaseExporter(t *testing.T) { - be, err := newBaseExporter(defaultSettings, fromOptions(), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, newBaseSettings(false), "") + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, be.Shutdown(context.Background())) + be, err = newBaseExporter(defaultSettings, newBaseSettings(true), "") require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) require.NoError(t, be.Shutdown(context.Background())) @@ -45,12 +46,12 @@ func TestBaseExporterWithOptions(t *testing.T) { want := errors.New("my error") be, err := newBaseExporter( defaultSettings, - fromOptions( + newBaseSettings( + false, WithStart(func(ctx context.Context, host component.Host) error { return want }), WithShutdown(func(ctx context.Context) error { return want }), WithTimeout(NewDefaultTimeoutSettings())), "", - nopRequestUnmarshaler(), ) require.NoError(t, err) require.Equal(t, want, be.Start(context.Background(), componenttest.NewNopHost())) @@ -65,13 +66,3 @@ func checkStatus(t *testing.T, sd sdktrace.ReadOnlySpan, err error) { require.Equal(t, codes.Unset, sd.Status().Code, "SpanData %v", sd) } } - -func nopTracePusher() consumer.ConsumeTracesFunc { - return func(ctx context.Context, ld ptrace.Traces) error { - return nil - } -} - -func nopRequestUnmarshaler() internal.RequestUnmarshaler { - return newTraceRequestUnmarshalerFunc(nopTracePusher()) -} diff --git a/exporter/exporterhelper/constants.go b/exporter/exporterhelper/constants.go index bdcbf1a4fd6..a7cfca32aca 100644 --- a/exporter/exporterhelper/constants.go +++ b/exporter/exporterhelper/constants.go @@ -18,4 +18,10 @@ var ( errNilPushMetricsData = errors.New("nil PushMetrics") // errNilPushLogsData is returned when a nil PushLogs is given. errNilPushLogsData = errors.New("nil PushLogs") + // errNilTracesConverter is returned when a nil TracesConverter is given. + errNilTracesConverter = errors.New("nil TracesConverter") + // errNilMetricsConverter is returned when a nil MetricsConverter is given. + errNilMetricsConverter = errors.New("nil MetricsConverter") + // errNilLogsConverter is returned when a nil LogsConverter is given. + errNilLogsConverter = errors.New("nil LogsConverter") ) diff --git a/exporter/exporterhelper/internal/persistent_queue.go b/exporter/exporterhelper/internal/persistent_queue.go index a7956d6f569..63a617daebd 100644 --- a/exporter/exporterhelper/internal/persistent_queue.go +++ b/exporter/exporterhelper/internal/persistent_queue.go @@ -35,11 +35,21 @@ func buildPersistentStorageName(name string, signal component.DataType) string { return fmt.Sprintf("%s-%s", name, signal) } +type PersistentQueueSettings struct { + Name string + Signal component.DataType + Capacity uint64 + Logger *zap.Logger + Client storage.Client + Unmarshaler RequestUnmarshaler + Marshaler RequestMarshaler +} + // NewPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage -func NewPersistentQueue(ctx context.Context, name string, signal component.DataType, capacity int, logger *zap.Logger, client storage.Client, unmarshaler RequestUnmarshaler) ProducerConsumerQueue { +func NewPersistentQueue(ctx context.Context, params PersistentQueueSettings) ProducerConsumerQueue { return &persistentQueue{ stopChan: make(chan struct{}), - storage: newPersistentContiguousStorage(ctx, buildPersistentStorageName(name, signal), uint64(capacity), logger, client, unmarshaler), + storage: newPersistentContiguousStorage(ctx, buildPersistentStorageName(params.Name, params.Signal), params), } } diff --git a/exporter/exporterhelper/internal/persistent_queue_test.go b/exporter/exporterhelper/internal/persistent_queue_test.go index 13505440580..8b1ffb7674e 100644 --- a/exporter/exporterhelper/internal/persistent_queue_test.go +++ b/exporter/exporterhelper/internal/persistent_queue_test.go @@ -28,7 +28,15 @@ func createTestQueue(extension storage.Extension, capacity int) *persistentQueue panic(err) } - wq := NewPersistentQueue(context.Background(), "foo", component.DataTypeTraces, capacity, logger, client, newFakeTracesRequestUnmarshalerFunc()) + wq := NewPersistentQueue(context.Background(), PersistentQueueSettings{ + Name: "foo", + Signal: component.DataTypeTraces, + Capacity: uint64(capacity), + Logger: logger, + Client: client, + Unmarshaler: newFakeTracesRequestUnmarshalerFunc(), + Marshaler: newFakeTracesRequestMarshalerFunc(), + }) return wq.(*persistentQueue) } diff --git a/exporter/exporterhelper/internal/persistent_storage.go b/exporter/exporterhelper/internal/persistent_storage.go index 1d79d63a124..cbbcf2e03c5 100644 --- a/exporter/exporterhelper/internal/persistent_storage.go +++ b/exporter/exporterhelper/internal/persistent_storage.go @@ -43,6 +43,7 @@ type persistentContiguousStorage struct { queueName string client storage.Client unmarshaler RequestUnmarshaler + marshaler RequestMarshaler putChan chan struct{} stopChan chan struct{} @@ -80,14 +81,15 @@ var ( // newPersistentContiguousStorage creates a new file-storage extension backed queue; // queueName parameter must be a unique value that identifies the queue. -func newPersistentContiguousStorage(ctx context.Context, queueName string, capacity uint64, logger *zap.Logger, client storage.Client, unmarshaler RequestUnmarshaler) *persistentContiguousStorage { +func newPersistentContiguousStorage(ctx context.Context, queueName string, set PersistentQueueSettings) *persistentContiguousStorage { pcs := &persistentContiguousStorage{ - logger: logger, - client: client, + logger: set.Logger, + client: set.Client, queueName: queueName, - unmarshaler: unmarshaler, - capacity: capacity, - putChan: make(chan struct{}, capacity), + unmarshaler: set.Unmarshaler, + marshaler: set.Marshaler, + capacity: set.Capacity, + putChan: make(chan struct{}, set.Capacity), reqChan: make(chan Request), stopChan: make(chan struct{}), itemsCount: &atomic.Uint64{}, diff --git a/exporter/exporterhelper/internal/persistent_storage_batch.go b/exporter/exporterhelper/internal/persistent_storage_batch.go index 85c99cf51e9..a80ba93c5c3 100644 --- a/exporter/exporterhelper/internal/persistent_storage_batch.go +++ b/exporter/exporterhelper/internal/persistent_storage_batch.go @@ -137,7 +137,7 @@ func (bof *batchStruct) getItemIndexArrayResult(key string) ([]itemIndex, error) // setRequest adds Set operation over a given request to the batch func (bof *batchStruct) setRequest(key string, value Request) *batchStruct { - return bof.set(key, value, requestToBytes) + return bof.set(key, value, bof.requestToBytes) } // setItemIndex adds Set operation over a given itemIndex to the batch @@ -206,8 +206,8 @@ func bytesToItemIndexArray(b []byte) (any, error) { return val, err } -func requestToBytes(req any) ([]byte, error) { - return req.(Request).Marshal() +func (bof *batchStruct) requestToBytes(req any) ([]byte, error) { + return bof.pcs.marshaler(req.(Request)) } func (bof *batchStruct) bytesToRequest(b []byte) (any, error) { diff --git a/exporter/exporterhelper/internal/persistent_storage_test.go b/exporter/exporterhelper/internal/persistent_storage_test.go index b27836fd2b6..5d33fc11064 100644 --- a/exporter/exporterhelper/internal/persistent_storage_test.go +++ b/exporter/exporterhelper/internal/persistent_storage_test.go @@ -36,7 +36,13 @@ func createTestClient(extension storage.Extension) storage.Client { } func createTestPersistentStorageWithLoggingAndCapacity(client storage.Client, logger *zap.Logger, capacity uint64) *persistentContiguousStorage { - return newPersistentContiguousStorage(context.Background(), "foo", capacity, logger, client, newFakeTracesRequestUnmarshalerFunc()) + return newPersistentContiguousStorage(context.Background(), "foo", PersistentQueueSettings{ + Capacity: capacity, + Logger: logger, + Client: client, + Unmarshaler: newFakeTracesRequestUnmarshalerFunc(), + Marshaler: newFakeTracesRequestMarshalerFunc(), + }) } func createTestPersistentStorage(client storage.Client) *persistentContiguousStorage { @@ -82,6 +88,13 @@ func newFakeTracesRequestUnmarshalerFunc() RequestUnmarshaler { } } +func newFakeTracesRequestMarshalerFunc() RequestMarshaler { + return func(req Request) ([]byte, error) { + marshaler := ptrace.ProtoMarshaler{} + return marshaler.MarshalTraces(req.(*fakeTracesRequest).td) + } +} + func TestPersistentStorage_CorruptedData(t *testing.T) { path := t.TempDir() diff --git a/exporter/exporterhelper/internal/request.go b/exporter/exporterhelper/internal/request.go index 390b35e94bd..454a42782ce 100644 --- a/exporter/exporterhelper/internal/request.go +++ b/exporter/exporterhelper/internal/request.go @@ -22,9 +22,6 @@ type Request interface { // Count returns the count of spans/metric points or log records. Count() int - // Marshal serializes the current request into a byte stream - Marshal() ([]byte, error) - // OnProcessingFinished calls the optional callback function to handle cleanup after all processing is finished OnProcessingFinished() @@ -34,3 +31,6 @@ type Request interface { // RequestUnmarshaler defines a function which takes a byte slice and unmarshals it into a relevant request type RequestUnmarshaler func([]byte) (Request, error) + +// RequestMarshaler defines a function which takes a request and marshals it into a byte slice +type RequestMarshaler func(Request) ([]byte, error) diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index b53bfc8addb..0fdf1fc3858 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -7,6 +7,8 @@ import ( "context" "errors" + "go.uber.org/zap" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" @@ -42,6 +44,10 @@ func newLogsRequestUnmarshalerFunc(pusher consumer.ConsumeLogsFunc) internal.Req } } +func logsRequestMarshaler(req internal.Request) ([]byte, error) { + return logsMarshaler.MarshalLogs(req.(*logsRequest).ld) +} + func (req *logsRequest) OnError(err error) internal.Request { var logError consumererror.Logs if errors.As(err, &logError) { @@ -54,10 +60,6 @@ func (req *logsRequest) Export(ctx context.Context) error { return req.pusher(ctx, req.ld) } -func (req *logsRequest) Marshal() ([]byte, error) { - return logsMarshaler.MarshalLogs(req.ld) -} - func (req *logsRequest) Count() int { return req.ld.LogRecordCount() } @@ -87,8 +89,10 @@ func NewLogsExporter( return nil, errNilPushLogsData } - bs := fromOptions(options...) - be, err := newBaseExporter(set, bs, component.DataTypeLogs, newLogsRequestUnmarshalerFunc(pusher)) + bs := newBaseSettings(false, options...) + bs.marshaler = logsRequestMarshaler + bs.unmarshaler = newLogsRequestUnmarshalerFunc(pusher) + be, err := newBaseExporter(set, bs, component.DataTypeLogs) if err != nil { return nil, err } @@ -114,6 +118,55 @@ func NewLogsExporter( }, err } +type LogsConverter interface { + // RequestFromLogs converts plog.Logs data into a request. + RequestFromLogs(context.Context, plog.Logs) (Request, error) +} + +// NewLogsRequestExporter creates new logs exporter based on custom LogsConverter and RequestSender. +func NewLogsRequestExporter( + _ context.Context, + set exporter.CreateSettings, + converter LogsConverter, + options ...Option, +) (exporter.Logs, error) { + if set.Logger == nil { + return nil, errNilLogger + } + + if converter == nil { + return nil, errNilLogsConverter + } + + bs := newBaseSettings(true, options...) + + be, err := newBaseExporter(set, bs, component.DataTypeLogs) + if err != nil { + return nil, err + } + + // TODO: Add new observability tracing/metrics to the new exporterhelper. + + lc, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error { + req, cErr := converter.RequestFromLogs(ctx, ld) + if cErr != nil { + set.Logger.Error("Failed to convert logs. Dropping data.", + zap.Int("dropped_log_records", ld.LogRecordCount()), + zap.Error(err)) + return consumererror.NewPermanent(cErr) + } + return be.sender.send(&request{ + baseRequest: baseRequest{ctx: ctx}, + Request: req, + }) + }, bs.consumerOptions...) + + return &logsExporter{ + baseExporter: be, + Logs: lc, + }, err +} + type logsExporterWithObservability struct { obsrep *obsExporter nextSender requestSender diff --git a/exporter/exporterhelper/logs_test.go b/exporter/exporterhelper/logs_test.go index 3aa50b26e02..14e35679ff4 100644 --- a/exporter/exporterhelper/logs_test.go +++ b/exporter/exporterhelper/logs_test.go @@ -1,5 +1,6 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 + package exporterhelper import ( @@ -59,12 +60,24 @@ func TestLogsExporter_NilLogger(t *testing.T) { require.Equal(t, errNilLogger, err) } +func TestLogsRequestExporter_NilLogger(t *testing.T) { + le, err := NewLogsRequestExporter(context.Background(), exporter.CreateSettings{}, &fakeRequestConverter{}) + require.Nil(t, le) + require.Equal(t, errNilLogger, err) +} + func TestLogsExporter_NilPushLogsData(t *testing.T) { le, err := NewLogsExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeLogsExporterConfig, nil) require.Nil(t, le) require.Equal(t, errNilPushLogsData, err) } +func TestLogsRequestExporter_NilLogsConverter(t *testing.T) { + le, err := NewLogsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), nil) + require.Nil(t, le) + require.Equal(t, errNilLogsConverter, err) +} + func TestLogsExporter_Default(t *testing.T) { ld := plog.NewLogs() le, err := NewLogsExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeLogsExporterConfig, newPushLogsData(nil)) @@ -77,6 +90,18 @@ func TestLogsExporter_Default(t *testing.T) { assert.NoError(t, le.Shutdown(context.Background())) } +func TestLogsRequestExporter_Default(t *testing.T) { + ld := plog.NewLogs() + le, err := NewLogsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}) + assert.NotNil(t, le) + assert.NoError(t, err) + + assert.Equal(t, consumer.Capabilities{MutatesData: false}, le.Capabilities()) + assert.NoError(t, le.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, le.ConsumeLogs(context.Background(), ld)) + assert.NoError(t, le.Shutdown(context.Background())) +} + func TestLogsExporter_WithCapabilities(t *testing.T) { capabilities := consumer.Capabilities{MutatesData: true} le, err := NewLogsExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeLogsExporterConfig, newPushLogsData(nil), WithCapabilities(capabilities)) @@ -86,6 +111,15 @@ func TestLogsExporter_WithCapabilities(t *testing.T) { assert.Equal(t, capabilities, le.Capabilities()) } +func TestLogsRequestExporter_WithCapabilities(t *testing.T) { + capabilities := consumer.Capabilities{MutatesData: true} + le, err := NewLogsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, WithCapabilities(capabilities)) + require.NoError(t, err) + require.NotNil(t, le) + + assert.Equal(t, capabilities, le.Capabilities()) +} + func TestLogsExporter_Default_ReturnError(t *testing.T) { ld := plog.NewLogs() want := errors.New("my_error") @@ -95,7 +129,26 @@ func TestLogsExporter_Default_ReturnError(t *testing.T) { require.Equal(t, want, le.ConsumeLogs(context.Background(), ld)) } -func TestLogsExporter_WithRecordLogs(t *testing.T) { +func TestLogsRequestExporter_Default_ConvertError(t *testing.T) { + ld := plog.NewLogs() + want := errors.New("convert_error") + le, err := NewLogsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{logsError: want}) + require.NoError(t, err) + require.NotNil(t, le) + require.Equal(t, consumererror.NewPermanent(want), le.ConsumeLogs(context.Background(), ld)) +} + +func TestLogsRequestExporter_Default_ExportError(t *testing.T) { + ld := plog.NewLogs() + want := errors.New("export_error") + le, err := NewLogsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), + &fakeRequestConverter{requestError: want}) + require.NoError(t, err) + require.NotNil(t, le) + require.Equal(t, want, le.ConsumeLogs(context.Background(), ld)) +} + +func TestLogsExporter_WithRecordMetrics(t *testing.T) { tt, err := obsreporttest.SetupTelemetry(fakeLogsExporterName) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) @@ -107,7 +160,7 @@ func TestLogsExporter_WithRecordLogs(t *testing.T) { checkRecordedMetricsForLogsExporter(t, tt, le, nil) } -func TestLogsExporter_WithRecordLogs_ReturnError(t *testing.T) { +func TestLogsExporter_WithRecordMetrics_ReturnError(t *testing.T) { want := errors.New("my_error") tt, err := obsreporttest.SetupTelemetry(fakeLogsExporterName) require.NoError(t, err) @@ -184,6 +237,18 @@ func TestLogsExporter_WithShutdown(t *testing.T) { assert.True(t, shutdownCalled) } +func TestLogsRequestExporter_WithShutdown(t *testing.T) { + shutdownCalled := false + shutdown := func(context.Context) error { shutdownCalled = true; return nil } + + le, err := NewLogsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, WithShutdown(shutdown)) + assert.NotNil(t, le) + assert.NoError(t, err) + + assert.Nil(t, le.Shutdown(context.Background())) + assert.True(t, shutdownCalled) +} + func TestLogsExporter_WithShutdown_ReturnError(t *testing.T) { want := errors.New("my_error") shutdownErr := func(context.Context) error { return want } @@ -195,6 +260,17 @@ func TestLogsExporter_WithShutdown_ReturnError(t *testing.T) { assert.Equal(t, le.Shutdown(context.Background()), want) } +func TestLogsRequestExporter_WithShutdown_ReturnError(t *testing.T) { + want := errors.New("my_error") + shutdownErr := func(context.Context) error { return want } + + le, err := NewLogsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, WithShutdown(shutdownErr)) + assert.NotNil(t, le) + assert.NoError(t, err) + + assert.Equal(t, le.Shutdown(context.Background()), want) +} + func newPushLogsData(retError error) consumer.ConsumeLogsFunc { return func(ctx context.Context, td plog.Logs) error { return retError @@ -225,7 +301,8 @@ func generateLogsTraffic(t *testing.T, tracer trace.Tracer, le exporter.Logs, nu } } -func checkWrapSpanForLogsExporter(t *testing.T, sr *tracetest.SpanRecorder, tracer trace.Tracer, le exporter.Logs, wantError error, numLogRecords int64) { +func checkWrapSpanForLogsExporter(t *testing.T, sr *tracetest.SpanRecorder, tracer trace.Tracer, le exporter.Logs, + wantError error, numLogRecords int64) { // nolint: unparam const numRequests = 5 generateLogsTraffic(t, tracer, le, numRequests, wantError) diff --git a/exporter/exporterhelper/metrics.go b/exporter/exporterhelper/metrics.go index 1639a45fcab..906d0d6f855 100644 --- a/exporter/exporterhelper/metrics.go +++ b/exporter/exporterhelper/metrics.go @@ -7,6 +7,8 @@ import ( "context" "errors" + "go.uber.org/zap" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" @@ -42,6 +44,10 @@ func newMetricsRequestUnmarshalerFunc(pusher consumer.ConsumeMetricsFunc) intern } } +func metricsRequestMarshaler(req internal.Request) ([]byte, error) { + return metricsMarshaler.MarshalMetrics(req.(*metricsRequest).md) +} + func (req *metricsRequest) OnError(err error) internal.Request { var metricsError consumererror.Metrics if errors.As(err, &metricsError) { @@ -54,11 +60,6 @@ func (req *metricsRequest) Export(ctx context.Context) error { return req.pusher(ctx, req.md) } -// Marshal provides serialization capabilities required by persistent queue -func (req *metricsRequest) Marshal() ([]byte, error) { - return metricsMarshaler.MarshalMetrics(req.md) -} - func (req *metricsRequest) Count() int { return req.md.DataPointCount() } @@ -88,8 +89,10 @@ func NewMetricsExporter( return nil, errNilPushMetricsData } - bs := fromOptions(options...) - be, err := newBaseExporter(set, bs, component.DataTypeMetrics, newMetricsRequestUnmarshalerFunc(pusher)) + bs := newBaseSettings(false, options...) + bs.marshaler = metricsRequestMarshaler + bs.unmarshaler = newMetricsRequestUnmarshalerFunc(pusher) + be, err := newBaseExporter(set, bs, component.DataTypeMetrics) if err != nil { return nil, err } @@ -115,6 +118,55 @@ func NewMetricsExporter( }, err } +type MetricsConverter interface { + // RequestFromMetrics converts pdata.Metrics into a request. + RequestFromMetrics(context.Context, pmetric.Metrics) (Request, error) +} + +// NewMetricsRequestExporter creates a new metrics exporter based on a custom MetricsConverter and RequestSender. +func NewMetricsRequestExporter( + _ context.Context, + set exporter.CreateSettings, + converter MetricsConverter, + options ...Option, +) (exporter.Metrics, error) { + if set.Logger == nil { + return nil, errNilLogger + } + + if converter == nil { + return nil, errNilMetricsConverter + } + + bs := newBaseSettings(true, options...) + + be, err := newBaseExporter(set, bs, component.DataTypeMetrics) + if err != nil { + return nil, err + } + + // TODO: Add new observability tracing/metrics to the new exporterhelper. + + mc, err := consumer.NewMetrics(func(ctx context.Context, md pmetric.Metrics) error { + req, cErr := converter.RequestFromMetrics(ctx, md) + if cErr != nil { + set.Logger.Error("Failed to convert metrics. Dropping data.", + zap.Int("dropped_data_points", md.DataPointCount()), + zap.Error(err)) + return consumererror.NewPermanent(cErr) + } + return be.sender.send(&request{ + Request: req, + baseRequest: baseRequest{ctx: ctx}, + }) + }, bs.consumerOptions...) + + return &metricsExporter{ + baseExporter: be, + Metrics: mc, + }, err +} + type metricsSenderWithObservability struct { obsrep *obsExporter nextSender requestSender diff --git a/exporter/exporterhelper/metrics_test.go b/exporter/exporterhelper/metrics_test.go index 5ebc2d9ec65..c8ea7587219 100644 --- a/exporter/exporterhelper/metrics_test.go +++ b/exporter/exporterhelper/metrics_test.go @@ -1,5 +1,6 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 + package exporterhelper import ( @@ -47,7 +48,7 @@ func TestMetricsRequest(t *testing.T) { ) } -func TestMetricsExporter_InvalidName(t *testing.T) { +func TestMetricsExporter_NilConfig(t *testing.T) { me, err := NewMetricsExporter(context.Background(), exportertest.NewNopCreateSettings(), nil, newPushMetricsData(nil)) require.Nil(t, me) require.Equal(t, errNilConfig, err) @@ -59,12 +60,24 @@ func TestMetricsExporter_NilLogger(t *testing.T) { require.Equal(t, errNilLogger, err) } +func TestMetricsRequestExporter_NilLogger(t *testing.T) { + me, err := NewMetricsRequestExporter(context.Background(), exporter.CreateSettings{}, fakeRequestConverter{}) + require.Nil(t, me) + require.Equal(t, errNilLogger, err) +} + func TestMetricsExporter_NilPushMetricsData(t *testing.T) { me, err := NewMetricsExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeMetricsExporterConfig, nil) require.Nil(t, me) require.Equal(t, errNilPushMetricsData, err) } +func TestMetricsRequestExporter_NilMetricsConverter(t *testing.T) { + me, err := NewMetricsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), nil) + require.Nil(t, me) + require.Equal(t, errNilMetricsConverter, err) +} + func TestMetricsExporter_Default(t *testing.T) { md := pmetric.NewMetrics() me, err := NewMetricsExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeMetricsExporterConfig, newPushMetricsData(nil)) @@ -77,6 +90,18 @@ func TestMetricsExporter_Default(t *testing.T) { assert.NoError(t, me.Shutdown(context.Background())) } +func TestMetricsRequestExporter_Default(t *testing.T) { + md := pmetric.NewMetrics() + me, err := NewMetricsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), fakeRequestConverter{}) + assert.NoError(t, err) + assert.NotNil(t, me) + + assert.Equal(t, consumer.Capabilities{MutatesData: false}, me.Capabilities()) + assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, me.ConsumeMetrics(context.Background(), md)) + assert.NoError(t, me.Shutdown(context.Background())) +} + func TestMetricsExporter_WithCapabilities(t *testing.T) { capabilities := consumer.Capabilities{MutatesData: true} me, err := NewMetricsExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeMetricsExporterConfig, newPushMetricsData(nil), WithCapabilities(capabilities)) @@ -86,6 +111,16 @@ func TestMetricsExporter_WithCapabilities(t *testing.T) { assert.Equal(t, capabilities, me.Capabilities()) } +func TestMetricsRequestExporter_WithCapabilities(t *testing.T) { + capabilities := consumer.Capabilities{MutatesData: true} + me, err := NewMetricsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), fakeRequestConverter{}, + WithCapabilities(capabilities)) + assert.NoError(t, err) + assert.NotNil(t, me) + + assert.Equal(t, capabilities, me.Capabilities()) +} + func TestMetricsExporter_Default_ReturnError(t *testing.T) { md := pmetric.NewMetrics() want := errors.New("my_error") @@ -95,6 +130,25 @@ func TestMetricsExporter_Default_ReturnError(t *testing.T) { require.Equal(t, want, me.ConsumeMetrics(context.Background(), md)) } +func TestMetricsRequestExporter_Default_ConvertError(t *testing.T) { + md := pmetric.NewMetrics() + want := errors.New("convert_error") + me, err := NewMetricsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), fakeRequestConverter{metricsError: want}) + require.NoError(t, err) + require.NotNil(t, me) + require.Equal(t, consumererror.NewPermanent(want), me.ConsumeMetrics(context.Background(), md)) +} + +func TestMetricsRequestExporter_Default_ExportError(t *testing.T) { + md := pmetric.NewMetrics() + want := errors.New("export_error") + me, err := NewMetricsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), + fakeRequestConverter{requestError: want}) + require.NoError(t, err) + require.NotNil(t, me) + require.Equal(t, want, me.ConsumeMetrics(context.Background(), md)) +} + func TestMetricsExporter_WithRecordMetrics(t *testing.T) { tt, err := obsreporttest.SetupTelemetry(fakeMetricsExporterName) require.NoError(t, err) @@ -185,6 +239,20 @@ func TestMetricsExporter_WithShutdown(t *testing.T) { assert.True(t, shutdownCalled) } +func TestMetricsRequestExporter_WithShutdown(t *testing.T) { + shutdownCalled := false + shutdown := func(context.Context) error { shutdownCalled = true; return nil } + + me, err := NewMetricsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), + &fakeRequestConverter{}, WithShutdown(shutdown)) + assert.NotNil(t, me) + assert.NoError(t, err) + + assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, me.Shutdown(context.Background())) + assert.True(t, shutdownCalled) +} + func TestMetricsExporter_WithShutdown_ReturnError(t *testing.T) { want := errors.New("my_error") shutdownErr := func(context.Context) error { return want } @@ -197,6 +265,19 @@ func TestMetricsExporter_WithShutdown_ReturnError(t *testing.T) { assert.Equal(t, want, me.Shutdown(context.Background())) } +func TestMetricsRequestExporter_WithShutdown_ReturnError(t *testing.T) { + want := errors.New("my_error") + shutdownErr := func(context.Context) error { return want } + + me, err := NewMetricsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), + &fakeRequestConverter{}, WithShutdown(shutdownErr)) + assert.NotNil(t, me) + assert.NoError(t, err) + + assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost())) + assert.Equal(t, want, me.Shutdown(context.Background())) +} + func newPushMetricsData(retError error) consumer.ConsumeMetricsFunc { return func(ctx context.Context, td pmetric.Metrics) error { return retError @@ -228,7 +309,8 @@ func generateMetricsTraffic(t *testing.T, tracer trace.Tracer, me exporter.Metri } } -func checkWrapSpanForMetricsExporter(t *testing.T, sr *tracetest.SpanRecorder, tracer trace.Tracer, me exporter.Metrics, wantError error, numMetricPoints int64) { +func checkWrapSpanForMetricsExporter(t *testing.T, sr *tracetest.SpanRecorder, tracer trace.Tracer, + me exporter.Metrics, wantError error, numMetricPoints int64) { // nolint: unparam const numRequests = 5 generateMetricsTraffic(t, tracer, me, numRequests, wantError) diff --git a/exporter/exporterhelper/queued_retry.go b/exporter/exporterhelper/queued_retry.go index 2cf5f627d46..79505445b47 100644 --- a/exporter/exporterhelper/queued_retry.go +++ b/exporter/exporterhelper/queued_retry.go @@ -70,33 +70,32 @@ func (qCfg *QueueSettings) Validate() error { } type queuedRetrySender struct { - fullName string - id component.ID - signal component.DataType - cfg QueueSettings - consumerSender requestSender - queue internal.ProducerConsumerQueue - retryStopCh chan struct{} - traceAttribute attribute.KeyValue - logger *zap.Logger - requeuingEnabled bool - requestUnmarshaler internal.RequestUnmarshaler + fullName string + id component.ID + signal component.DataType + queueSettings queueSettings + consumerSender requestSender + queue internal.ProducerConsumerQueue + retryStopCh chan struct{} + traceAttribute attribute.KeyValue + logger *zap.Logger + requeuingEnabled bool } -func newQueuedRetrySender(id component.ID, signal component.DataType, qCfg QueueSettings, rCfg RetrySettings, reqUnmarshaler internal.RequestUnmarshaler, nextSender requestSender, logger *zap.Logger) *queuedRetrySender { +func newQueuedRetrySender(id component.ID, signal component.DataType, qs queueSettings, rCfg RetrySettings, + nextSender requestSender, logger *zap.Logger) *queuedRetrySender { retryStopCh := make(chan struct{}) sampledLogger := createSampledLogger(logger) traceAttr := attribute.String(obsmetrics.ExporterKey, id.String()) qrs := &queuedRetrySender{ - fullName: id.String(), - id: id, - signal: signal, - cfg: qCfg, - retryStopCh: retryStopCh, - traceAttribute: traceAttr, - logger: sampledLogger, - requestUnmarshaler: reqUnmarshaler, + fullName: id.String(), + id: id, + signal: signal, + queueSettings: qs, + retryStopCh: retryStopCh, + traceAttribute: traceAttr, + logger: sampledLogger, } qrs.consumerSender = &retrySender{ @@ -109,8 +108,8 @@ func newQueuedRetrySender(id component.ID, signal component.DataType, qCfg Queue onTemporaryFailure: qrs.onTemporaryFailure, } - if qCfg.StorageID == nil { - qrs.queue = internal.NewBoundedMemoryQueue(qrs.cfg.QueueSize) + if !qs.persistenceEnabled() { + qrs.queue = internal.NewBoundedMemoryQueue(qs.config.QueueSize) } // The Persistent Queue is initialized separately as it needs extra information about the component @@ -143,16 +142,24 @@ func toStorageClient(ctx context.Context, storageID component.ID, host component // initializePersistentQueue uses extra information for initialization available from component.Host func (qrs *queuedRetrySender) initializePersistentQueue(ctx context.Context, host component.Host) error { - if qrs.cfg.StorageID == nil { + if !qrs.queueSettings.persistenceEnabled() { return nil } - storageClient, err := toStorageClient(ctx, *qrs.cfg.StorageID, host, qrs.id, qrs.signal) + storageClient, err := toStorageClient(ctx, *qrs.queueSettings.config.StorageID, host, qrs.id, qrs.signal) if err != nil { return err } - qrs.queue = internal.NewPersistentQueue(ctx, qrs.fullName, qrs.signal, qrs.cfg.QueueSize, qrs.logger, storageClient, qrs.requestUnmarshaler) + qrs.queue = internal.NewPersistentQueue(ctx, internal.PersistentQueueSettings{ + Name: qrs.fullName, + Signal: qrs.signal, + Capacity: uint64(qrs.queueSettings.config.QueueSize), + Logger: qrs.logger, + Client: storageClient, + Marshaler: qrs.queueSettings.marshaler, + Unmarshaler: qrs.queueSettings.unmarshaler, + }) // TODO: this can be further exposed as a config param rather than relying on a type of queue qrs.requeuingEnabled = true @@ -191,13 +198,13 @@ func (qrs *queuedRetrySender) start(ctx context.Context, host component.Host) er return err } - qrs.queue.StartConsumers(qrs.cfg.NumConsumers, func(item internal.Request) { + qrs.queue.StartConsumers(qrs.queueSettings.config.NumConsumers, func(item internal.Request) { _ = qrs.consumerSender.send(item) item.OnProcessingFinished() }) // Start reporting queue length metric - if qrs.cfg.Enabled { + if qrs.queueSettings.config.Enabled { err := globalInstruments.queueSize.UpsertEntry(func() int64 { return int64(qrs.queue.Size()) }, metricdata.NewLabelValue(qrs.fullName)) @@ -205,7 +212,7 @@ func (qrs *queuedRetrySender) start(ctx context.Context, host component.Host) er return fmt.Errorf("failed to create retry queue size metric: %w", err) } err = globalInstruments.queueCapacity.UpsertEntry(func() int64 { - return int64(qrs.cfg.QueueSize) + return int64(qrs.queueSettings.config.QueueSize) }, metricdata.NewLabelValue(qrs.fullName)) if err != nil { return fmt.Errorf("failed to create retry queue capacity metric: %w", err) @@ -218,7 +225,7 @@ func (qrs *queuedRetrySender) start(ctx context.Context, host component.Host) er // shutdown is invoked during service shutdown. func (qrs *queuedRetrySender) shutdown() { // Cleanup queue metrics reporting - if qrs.cfg.Enabled { + if qrs.queueSettings.config.Enabled { _ = globalInstruments.queueSize.UpsertEntry(func() int64 { return int64(0) }, metricdata.NewLabelValue(qrs.fullName)) @@ -287,7 +294,7 @@ func createSampledLogger(logger *zap.Logger) *zap.Logger { // send implements the requestSender interface func (qrs *queuedRetrySender) send(req internal.Request) error { - if !qrs.cfg.Enabled { + if !qrs.queueSettings.config.Enabled { err := qrs.consumerSender.send(req) if err != nil { qrs.logger.Error( diff --git a/exporter/exporterhelper/queued_retry_test.go b/exporter/exporterhelper/queued_retry_test.go index d3e8f4b8c64..a55eade8680 100644 --- a/exporter/exporterhelper/queued_retry_test.go +++ b/exporter/exporterhelper/queued_retry_test.go @@ -36,11 +36,18 @@ func mockRequestUnmarshaler(mr *mockRequest) internal.RequestUnmarshaler { } } +func mockRequestMarshaler(_ internal.Request) ([]byte, error) { + return nil, nil +} + func TestQueuedRetry_DropOnPermanentError(t *testing.T) { qCfg := NewDefaultQueueSettings() rCfg := NewDefaultRetrySettings() mockR := newMockRequest(context.Background(), 2, consumererror.NewPermanent(errors.New("bad data"))) - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", mockRequestUnmarshaler(mockR)) + bs := newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)) + bs.marshaler = mockRequestMarshaler + bs.unmarshaler = mockRequestUnmarshaler(mockR) + be, err := newBaseExporter(defaultSettings, bs, "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -64,7 +71,10 @@ func TestQueuedRetry_DropOnNoRetry(t *testing.T) { qCfg := NewDefaultQueueSettings() rCfg := NewDefaultRetrySettings() rCfg.Enabled = false - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + bs := newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)) + bs.marshaler = mockRequestMarshaler + bs.unmarshaler = mockRequestUnmarshaler(newMockRequest(context.Background(), 2, errors.New("transient error"))) + be, err := newBaseExporter(defaultSettings, bs, "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -90,7 +100,7 @@ func TestQueuedRetry_OnError(t *testing.T) { qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() rCfg.InitialInterval = 0 - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -117,7 +127,7 @@ func TestQueuedRetry_StopWhileWaiting(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -151,7 +161,7 @@ func TestQueuedRetry_DoNotPreserveCancellation(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -181,7 +191,7 @@ func TestQueuedRetry_MaxElapsedTime(t *testing.T) { rCfg := NewDefaultRetrySettings() rCfg.InitialInterval = time.Millisecond rCfg.MaxElapsedTime = 100 * time.Millisecond - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -228,7 +238,7 @@ func TestQueuedRetry_ThrottleError(t *testing.T) { qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() rCfg.InitialInterval = 10 * time.Millisecond - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -261,7 +271,7 @@ func TestQueuedRetry_RetryOnError(t *testing.T) { qCfg.QueueSize = 1 rCfg := NewDefaultRetrySettings() rCfg.InitialInterval = 0 - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -288,7 +298,7 @@ func TestQueuedRetry_DropOnFull(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.QueueSize = 0 rCfg := NewDefaultRetrySettings() - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -309,7 +319,7 @@ func TestQueuedRetryHappyPath(t *testing.T) { qCfg := NewDefaultQueueSettings() rCfg := NewDefaultRetrySettings() set := tt.ToExporterCreateSettings() - be, err := newBaseExporter(set, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(set, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -344,7 +354,7 @@ func TestQueuedRetry_QueueMetricsReported(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.NumConsumers = 0 // to make every request go straight to the queue rCfg := NewDefaultRetrySettings() - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -478,7 +488,7 @@ func TestQueuedRetry_RequeuingEnabled(t *testing.T) { qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs @@ -510,7 +520,7 @@ func TestQueuedRetry_RequeuingEnabledQueueFull(t *testing.T) { qCfg.QueueSize = 0 rCfg := NewDefaultRetrySettings() rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) be.qrSender.requeuingEnabled = true require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -535,7 +545,7 @@ func TestQueuedRetryPersistenceEnabled(t *testing.T) { qCfg.StorageID = &storageID // enable persistence rCfg := NewDefaultRetrySettings() set := tt.ToExporterCreateSettings() - be, err := newBaseExporter(set, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(set, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) var extensions = map[component.ID]component.Component{ @@ -559,7 +569,10 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) { qCfg.StorageID = &storageID // enable persistence rCfg := NewDefaultRetrySettings() set := tt.ToExporterCreateSettings() - be, err := newBaseExporter(set, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + bs := newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)) + bs.marshaler = mockRequestMarshaler + bs.unmarshaler = mockRequestUnmarshaler(&mockRequest{}) + be, err := newBaseExporter(set, bs, "") require.NoError(t, err) var extensions = map[component.ID]component.Component{ @@ -583,7 +596,7 @@ func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) { req := newMockRequest(context.Background(), 3, errors.New("some error")) - be, err := newBaseExporter(defaultSettings, fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) + be, err := newBaseExporter(defaultSettings, newBaseSettings(false, WithRetry(rCfg), WithQueue(qCfg)), "") require.NoError(t, err) require.NoError(t, be.Start(context.Background(), &mockHost{})) @@ -617,6 +630,14 @@ func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) { }, time.Second, 1*time.Millisecond) } +func TestQueueRetryOptionsWithRequestExporter(t *testing.T) { + bs := newBaseSettings(true, WithRetry(NewDefaultRetrySettings())) + assert.True(t, bs.requestExporter) + assert.Panics(t, func() { + _ = newBaseSettings(true, WithRetry(NewDefaultRetrySettings()), WithQueue(NewDefaultQueueSettings())) + }) +} + type mockErrorRequest struct { baseRequest } diff --git a/exporter/exporterhelper/request.go b/exporter/exporterhelper/request.go new file mode 100644 index 00000000000..30f250c8ba0 --- /dev/null +++ b/exporter/exporterhelper/request.go @@ -0,0 +1,50 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper" + +import ( + "context" + + "go.opentelemetry.io/collector/exporter/exporterhelper/internal" +) + +// Request represents a single request that can be sent to an external endpoint. +type Request interface { + // Export exports the request to an external endpoint. + Export(ctx context.Context) error +} + +// RequestItemsCounter is an optional interface that can be implemented by Request to provide a number of items +// in the request. This is a recommended interface to implement for exporters. It is required for batching and queueing +// based on number of items. Also, it's used for reporting number of items in collector's logs, metrics and traces. +// If not implemented, collector's logs, metrics and traces will report 0 items. +type RequestItemsCounter interface { + // ItemsCount returns a number of basic items in the request where item is the smallest piece of data that can be + // sent. For example, for OTLP exporter, this value represents the number of spans, + // metric data points or log records. + ItemsCount() int +} + +type request struct { + Request + baseRequest +} + +var _ internal.Request = (*request)(nil) + +func (req *request) OnError(_ error) internal.Request { + // Potentially we could introduce a new RequestError type that would represent partially succeeded request. + // In that case we should consider returning them back to the pipeline converted back to pdata in case if + // sending queue is disabled. We leave it as a future improvement if decided that it's needed. + return req +} + +// Count returns a number of items in the request. If the request does not implement RequestItemsCounter +// then 0 is returned. +func (req *request) Count() int { + if counter, ok := req.Request.(RequestItemsCounter); ok { + return counter.ItemsCount() + } + return 0 +} diff --git a/exporter/exporterhelper/request_test.go b/exporter/exporterhelper/request_test.go new file mode 100644 index 00000000000..6dd3f67800a --- /dev/null +++ b/exporter/exporterhelper/request_test.go @@ -0,0 +1,44 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterhelper + +import ( + "context" + + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +type fakeRequest struct { + items int + err error +} + +func (r fakeRequest) Export(_ context.Context) error { + return r.err +} + +func (r fakeRequest) ItemsCount() int { + return r.items +} + +type fakeRequestConverter struct { + metricsError error + tracesError error + logsError error + requestError error +} + +func (c fakeRequestConverter) RequestFromMetrics(_ context.Context, md pmetric.Metrics) (Request, error) { + return fakeRequest{items: md.DataPointCount(), err: c.requestError}, c.metricsError +} + +func (c fakeRequestConverter) RequestFromTraces(_ context.Context, td ptrace.Traces) (Request, error) { + return fakeRequest{items: td.SpanCount(), err: c.requestError}, c.tracesError +} + +func (c fakeRequestConverter) RequestFromLogs(_ context.Context, ld plog.Logs) (Request, error) { + return fakeRequest{items: ld.LogRecordCount(), err: c.requestError}, c.logsError +} diff --git a/exporter/exporterhelper/traces.go b/exporter/exporterhelper/traces.go index 978ece2201c..886c0a2f197 100644 --- a/exporter/exporterhelper/traces.go +++ b/exporter/exporterhelper/traces.go @@ -7,6 +7,8 @@ import ( "context" "errors" + "go.uber.org/zap" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" @@ -42,9 +44,8 @@ func newTraceRequestUnmarshalerFunc(pusher consumer.ConsumeTracesFunc) internal. } } -// Marshal provides serialization capabilities required by persistent queue -func (req *tracesRequest) Marshal() ([]byte, error) { - return tracesMarshaler.MarshalTraces(req.td) +func tracesRequestMarshaler(req internal.Request) ([]byte, error) { + return tracesMarshaler.MarshalTraces(req.(*tracesRequest).td) } func (req *tracesRequest) OnError(err error) internal.Request { @@ -88,8 +89,10 @@ func NewTracesExporter( return nil, errNilPushTraceData } - bs := fromOptions(options...) - be, err := newBaseExporter(set, bs, component.DataTypeTraces, newTraceRequestUnmarshalerFunc(pusher)) + bs := newBaseSettings(false, options...) + bs.marshaler = tracesRequestMarshaler + bs.unmarshaler = newTraceRequestUnmarshalerFunc(pusher) + be, err := newBaseExporter(set, bs, component.DataTypeTraces) if err != nil { return nil, err } @@ -115,6 +118,55 @@ func NewTracesExporter( }, err } +type TracesConverter interface { + // RequestFromTraces converts ptrace.Traces into a Request. + RequestFromTraces(context.Context, ptrace.Traces) (Request, error) +} + +// NewTracesRequestExporter creates a new traces exporter based on a custom TracesConverter and RequestSender. +func NewTracesRequestExporter( + _ context.Context, + set exporter.CreateSettings, + converter TracesConverter, + options ...Option, +) (exporter.Traces, error) { + if set.Logger == nil { + return nil, errNilLogger + } + + if converter == nil { + return nil, errNilTracesConverter + } + + bs := newBaseSettings(true, options...) + + be, err := newBaseExporter(set, bs, component.DataTypeTraces) + if err != nil { + return nil, err + } + + // TODO: Add new observability tracing/metrics to the new exporterhelper. + + tc, err := consumer.NewTraces(func(ctx context.Context, td ptrace.Traces) error { + req, cErr := converter.RequestFromTraces(ctx, td) + if cErr != nil { + set.Logger.Error("Failed to convert traces. Dropping data.", + zap.Int("dropped_spans", td.SpanCount()), + zap.Error(err)) + return consumererror.NewPermanent(cErr) + } + return be.sender.send(&request{ + baseRequest: baseRequest{ctx: ctx}, + Request: req, + }) + }, bs.consumerOptions...) + + return &traceExporter{ + baseExporter: be, + Traces: tc, + }, err +} + type tracesExporterWithObservability struct { obsrep *obsExporter nextSender requestSender diff --git a/exporter/exporterhelper/traces_test.go b/exporter/exporterhelper/traces_test.go index 3b850ce6be2..0b314426f10 100644 --- a/exporter/exporterhelper/traces_test.go +++ b/exporter/exporterhelper/traces_test.go @@ -1,5 +1,6 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 + package exporterhelper import ( @@ -55,12 +56,24 @@ func TestTracesExporter_NilLogger(t *testing.T) { require.Equal(t, errNilLogger, err) } +func TestTracesRequestExporter_NilLogger(t *testing.T) { + te, err := NewTracesRequestExporter(context.Background(), exporter.CreateSettings{}, &fakeRequestConverter{}) + require.Nil(t, te) + require.Equal(t, errNilLogger, err) +} + func TestTracesExporter_NilPushTraceData(t *testing.T) { te, err := NewTracesExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeTracesExporterConfig, nil) require.Nil(t, te) require.Equal(t, errNilPushTraceData, err) } +func TestTracesRequestExporter_NilTracesConverter(t *testing.T) { + te, err := NewTracesRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), nil) + require.Nil(t, te) + require.Equal(t, errNilTracesConverter, err) +} + func TestTracesExporter_Default(t *testing.T) { td := ptrace.NewTraces() te, err := NewTracesExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeTracesExporterConfig, newTraceDataPusher(nil)) @@ -73,6 +86,18 @@ func TestTracesExporter_Default(t *testing.T) { assert.NoError(t, te.Shutdown(context.Background())) } +func TestTracesRequestExporter_Default(t *testing.T) { + td := ptrace.NewTraces() + te, err := NewTracesRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}) + assert.NotNil(t, te) + assert.NoError(t, err) + + assert.Equal(t, consumer.Capabilities{MutatesData: false}, te.Capabilities()) + assert.NoError(t, te.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, te.ConsumeTraces(context.Background(), td)) + assert.NoError(t, te.Shutdown(context.Background())) +} + func TestTracesExporter_WithCapabilities(t *testing.T) { capabilities := consumer.Capabilities{MutatesData: true} te, err := NewTracesExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeTracesExporterConfig, newTraceDataPusher(nil), WithCapabilities(capabilities)) @@ -82,6 +107,15 @@ func TestTracesExporter_WithCapabilities(t *testing.T) { assert.Equal(t, capabilities, te.Capabilities()) } +func TestTracesRequestExporter_WithCapabilities(t *testing.T) { + capabilities := consumer.Capabilities{MutatesData: true} + te, err := NewTracesRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, WithCapabilities(capabilities)) + assert.NotNil(t, te) + assert.NoError(t, err) + + assert.Equal(t, capabilities, te.Capabilities()) +} + func TestTracesExporter_Default_ReturnError(t *testing.T) { td := ptrace.NewTraces() want := errors.New("my_error") @@ -93,6 +127,25 @@ func TestTracesExporter_Default_ReturnError(t *testing.T) { require.Equal(t, want, err) } +func TestTracesRequestExporter_Default_ConvertError(t *testing.T) { + td := ptrace.NewTraces() + want := errors.New("convert_error") + te, err := NewTracesRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), + &fakeRequestConverter{tracesError: want}) + require.NoError(t, err) + require.NotNil(t, te) + require.Equal(t, consumererror.NewPermanent(want), te.ConsumeTraces(context.Background(), td)) +} + +func TestTracesRequestExporter_Default_ExportError(t *testing.T) { + td := ptrace.NewTraces() + want := errors.New("export_error") + te, err := NewTracesRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{requestError: want}) + require.NoError(t, err) + require.NotNil(t, te) + require.Equal(t, want, te.ConsumeTraces(context.Background(), td)) +} + func TestTracesExporter_WithRecordMetrics(t *testing.T) { tt, err := obsreporttest.SetupTelemetry(fakeTracesExporterName) require.NoError(t, err) @@ -185,6 +238,19 @@ func TestTracesExporter_WithShutdown(t *testing.T) { assert.True(t, shutdownCalled) } +func TestTracesRequestExporter_WithShutdown(t *testing.T) { + shutdownCalled := false + shutdown := func(context.Context) error { shutdownCalled = true; return nil } + + te, err := NewTracesRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, WithShutdown(shutdown)) + assert.NotNil(t, te) + assert.NoError(t, err) + + assert.NoError(t, te.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, te.Shutdown(context.Background())) + assert.True(t, shutdownCalled) +} + func TestTracesExporter_WithShutdown_ReturnError(t *testing.T) { want := errors.New("my_error") shutdownErr := func(context.Context) error { return want } @@ -197,6 +263,18 @@ func TestTracesExporter_WithShutdown_ReturnError(t *testing.T) { assert.Equal(t, te.Shutdown(context.Background()), want) } +func TestTracesRequestExporter_WithShutdown_ReturnError(t *testing.T) { + want := errors.New("my_error") + shutdownErr := func(context.Context) error { return want } + + te, err := NewTracesRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, WithShutdown(shutdownErr)) + assert.NotNil(t, te) + assert.NoError(t, err) + + assert.NoError(t, te.Start(context.Background(), componenttest.NewNopHost())) + assert.Equal(t, te.Shutdown(context.Background()), want) +} + func newTraceDataPusher(retError error) consumer.ConsumeTracesFunc { return func(ctx context.Context, td ptrace.Traces) error { return retError @@ -228,7 +306,8 @@ func generateTraceTraffic(t *testing.T, tracer trace.Tracer, te exporter.Traces, } } -func checkWrapSpanForTracesExporter(t *testing.T, sr *tracetest.SpanRecorder, tracer trace.Tracer, te exporter.Traces, wantError error, numSpans int64) { +func checkWrapSpanForTracesExporter(t *testing.T, sr *tracetest.SpanRecorder, tracer trace.Tracer, + te exporter.Traces, wantError error, numSpans int64) { // nolint: unparam const numRequests = 5 generateTraceTraffic(t, tracer, te, numRequests, wantError)