From a0e3a2524421a4618ddc2b735abb83af1b92ae3b Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Mon, 24 Aug 2020 19:32:23 -0700 Subject: [PATCH] Remove ConsumerOld Signed-off-by: Bogdan Drutu --- consumer/consumer.go | 27 -- consumer/converter/converter.go | 129 --------- consumer/converter/converter_test.go | 102 ------- exporter/exportertest/nop_exporter.go | 6 +- exporter/exportertest/sink_exporter.go | 99 ------- exporter/exportertest/sink_exporter_test.go | 57 ---- processor/cloningfanoutconnector.go | 166 ++--------- processor/cloningfanoutconnector_test.go | 291 ++------------------ processor/fanoutconnector.go | 116 ++------ processor/fanoutconnector_test.go | 259 +++-------------- service/builder/exporters_builder.go | 12 +- service/builder/pipelines_builder.go | 58 ++-- service/builder/receivers_builder.go | 54 +--- 13 files changed, 142 insertions(+), 1234 deletions(-) delete mode 100644 consumer/converter/converter.go delete mode 100644 consumer/converter/converter_test.go diff --git a/consumer/consumer.go b/consumer/consumer.go index b29b871f30ab..ffe78246ac5e 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -18,45 +18,18 @@ package consumer import ( "context" - "go.opentelemetry.io/collector/consumer/consumerdata" "go.opentelemetry.io/collector/consumer/pdata" ) -// MetricsConsumerBase defines a common interface for MetricsConsumerOld and MetricsConsumer. -type MetricsConsumerBase interface{} - -// MetricsConsumerOld is an interface that receives consumerdata.MetricsData, process it as needed, and -// sends it to the next processing node if any or to the destination. -// -// ConsumeMetricsData receives consumerdata.MetricsData for processing by the MetricsConsumer. -type MetricsConsumerOld interface { - MetricsConsumerBase - ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error -} - // MetricsConsumer is the new metrics consumer interface that receives pdata.MetricData, processes it // as needed, and sends it to the next processing node if any or to the destination. type MetricsConsumer interface { - MetricsConsumerBase ConsumeMetrics(ctx context.Context, md pdata.Metrics) error } -// TraceConsumerBase defines a common interface for TraceConsumerOld and TraceConsumer. -type TraceConsumerBase interface{} - -// TraceConsumerOld is an interface that receives consumerdata.TraceData, process it as needed, and -// sends it to the next processing node if any or to the destination. -// -// ConsumeTraceData receives consumerdata.TraceData for processing by the TraceConsumer. -type TraceConsumerOld interface { - TraceConsumerBase - ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error -} - // TraceConsumer is an interface that receives pdata.Traces, processes it // as needed, and sends it to the next processing node if any or to the destination. type TraceConsumer interface { - TraceConsumerBase // ConsumeTraces receives pdata.Traces for processing. ConsumeTraces(ctx context.Context, td pdata.Traces) error } diff --git a/consumer/converter/converter.go b/consumer/converter/converter.go deleted file mode 100644 index afa394151bb0..000000000000 --- a/consumer/converter/converter.go +++ /dev/null @@ -1,129 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package converter - -import ( - "context" - - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/consumer/consumerdata" - "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/consumer/pdatautil" - "go.opentelemetry.io/collector/translator/internaldata" -) - -// NewInternalToOCTraceConverter creates new internalToOCTraceConverter that takes TraceConsumer and -// implements ConsumeTraces interface. -func NewInternalToOCTraceConverter(tc consumer.TraceConsumerOld) consumer.TraceConsumer { - return &internalToOCTraceConverter{tc} -} - -// internalToOCTraceConverter is a internal to oc translation shim that takes TraceConsumer and -// implements ConsumeTraces interface. -type internalToOCTraceConverter struct { - traceConsumer consumer.TraceConsumerOld -} - -// ConsumeTraces takes new-style data.Traces method, converts it to OC and uses old-style ConsumeTraceData method -// to process the trace data. -func (tc *internalToOCTraceConverter) ConsumeTraces(ctx context.Context, td pdata.Traces) error { - ocTraces := internaldata.TraceDataToOC(td) - for i := range ocTraces { - err := tc.traceConsumer.ConsumeTraceData(ctx, ocTraces[i]) - if err != nil { - return err - } - } - return nil -} - -var _ consumer.TraceConsumer = (*internalToOCTraceConverter)(nil) - -// NewInternalToOCMetricsConverter creates new internalToOCMetricsConverter that takes MetricsConsumer and -// implements ConsumeTraces interface. -func NewInternalToOCMetricsConverter(mc consumer.MetricsConsumerOld) consumer.MetricsConsumer { - return &internalToOCMetricsConverter{mc} -} - -// internalToOCMetricsConverter is a internal to oc translation shim that takes MetricsConsumer and -// implements ConsumeMetrics interface. -type internalToOCMetricsConverter struct { - metricsConsumer consumer.MetricsConsumerOld -} - -// ConsumeMetrics takes new-style data.MetricData method, converts it to OC and uses old-style ConsumeMetricsData method -// to process the metrics data. -func (tc *internalToOCMetricsConverter) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error { - ocMetrics := pdatautil.MetricsToMetricsData(md) - for i := range ocMetrics { - err := tc.metricsConsumer.ConsumeMetricsData(ctx, ocMetrics[i]) - if err != nil { - return err - } - } - return nil -} - -var _ consumer.MetricsConsumer = (*internalToOCMetricsConverter)(nil) - -// NewOCToInternalTraceConverter creates new ocToInternalTraceConverter that takes TraceConsumer and -// implements ConsumeTraces interface. -func NewOCToInternalTraceConverter(tc consumer.TraceConsumer) consumer.TraceConsumerOld { - return &ocToInternalTraceConverter{tc} -} - -// ocToInternalTraceConverter is a internal to oc translation shim that takes TraceConsumer and -// implements ConsumeTraces interface. -type ocToInternalTraceConverter struct { - traceConsumer consumer.TraceConsumer -} - -// ConsumeTraces takes new-style data.Traces method, converts it to OC and uses old-style ConsumeTraceData method -// to process the trace data. -func (tc *ocToInternalTraceConverter) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error { - traceData := internaldata.OCToTraceData(td) - err := tc.traceConsumer.ConsumeTraces(ctx, traceData) - if err != nil { - return err - } - - return nil -} - -var _ consumer.TraceConsumerOld = (*ocToInternalTraceConverter)(nil) - -// NewOCToInternalMetricsConverter creates new ocToInternalMetricsConverter that takes MetricsConsumer and -// implements ConsumeTraces interface. -func NewOCToInternalMetricsConverter(tc consumer.MetricsConsumer) consumer.MetricsConsumerOld { - return &ocToInternalMetricsConverter{tc} -} - -// ocToInternalMetricsConverter is a internal to oc translation shim that takes MetricsConsumer and -// implements ConsumeMetrics interface. -type ocToInternalMetricsConverter struct { - metricsConsumer consumer.MetricsConsumer -} - -// ConsumeMetrics takes new-style data.MetricData method, converts it to OC and uses old-style ConsumeMetricsData method -// to process the metrics data. -func (tc *ocToInternalMetricsConverter) ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error { - metricsData := pdatautil.MetricsFromMetricsData([]consumerdata.MetricsData{md}) - err := tc.metricsConsumer.ConsumeMetrics(ctx, metricsData) - if err != nil { - return err - } - return nil -} - -var _ consumer.MetricsConsumerOld = (*ocToInternalMetricsConverter)(nil) diff --git a/consumer/converter/converter_test.go b/consumer/converter/converter_test.go deleted file mode 100644 index 8f0372cb5bfb..000000000000 --- a/consumer/converter/converter_test.go +++ /dev/null @@ -1,102 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package converter - -import ( - "context" - "fmt" - "testing" - - "github.com/stretchr/testify/assert" - - "go.opentelemetry.io/collector/consumer/pdatautil" - "go.opentelemetry.io/collector/exporter/exportertest" - "go.opentelemetry.io/collector/internal/data/testdata" - "go.opentelemetry.io/collector/translator/internaldata" -) - -func TestNewInternalToOCTraceConverter(t *testing.T) { - td := testdata.GenerateTraceDataTwoSpansSameResourceOneDifferent() - traceExporterOld := new(exportertest.SinkTraceExporterOld) - converter := NewInternalToOCTraceConverter(traceExporterOld) - - err := converter.ConsumeTraces(context.Background(), td) - assert.NoError(t, err) - - ocTraces := traceExporterOld.AllTraces() - assert.Equal(t, len(ocTraces), 2) - assert.EqualValues(t, ocTraces, internaldata.TraceDataToOC(td)) - - traceExporterOld.SetConsumeTraceError(fmt.Errorf("consumer error")) - err = converter.ConsumeTraces(context.Background(), td) - assert.NotNil(t, err) -} - -func TestNewInternalToOCMetricsConverter(t *testing.T) { - md := testdata.GenerateMetricDataOneMetric() - metricsExporterOld := new(exportertest.SinkMetricsExporterOld) - converter := NewInternalToOCMetricsConverter(metricsExporterOld) - - err := converter.ConsumeMetrics(context.Background(), pdatautil.MetricsFromInternalMetrics(md)) - assert.NoError(t, err) - - ocMetrics := metricsExporterOld.AllMetrics() - assert.Equal(t, len(ocMetrics), 1) - assert.EqualValues(t, ocMetrics, internaldata.MetricDataToOC(md)) - - metricsExporterOld.SetConsumeMetricsError(fmt.Errorf("consumer error")) - err = converter.ConsumeMetrics(context.Background(), pdatautil.MetricsFromInternalMetrics(md)) - assert.NotNil(t, err) -} - -func TestNewOCTraceToInternalTraceConverter(t *testing.T) { - td := testdata.GenerateTraceDataOneSpan() - ocTraceData := internaldata.TraceDataToOC(td)[0] - traceExporter := new(exportertest.SinkTraceExporter) - converter := NewOCToInternalTraceConverter(traceExporter) - - err := converter.ConsumeTraceData(context.Background(), ocTraceData) - assert.NoError(t, err) - err = converter.ConsumeTraceData(context.Background(), ocTraceData) - assert.NoError(t, err) - - ocTraces := traceExporter.AllTraces() - assert.Equal(t, len(ocTraces), 2) - assert.EqualValues(t, ocTraces[0], td) - - traceExporter.SetConsumeTraceError(fmt.Errorf("consumer error")) - err = converter.ConsumeTraceData(context.Background(), ocTraceData) - assert.NotNil(t, err) -} - -func TestNewOCToInternalMetricsConverter(t *testing.T) { - md := testdata.GenerateMetricDataOneMetric() - ocMetricData := internaldata.MetricDataToOC(md)[0] - metricsExporter := new(exportertest.SinkMetricsExporter) - converter := NewOCToInternalMetricsConverter(metricsExporter) - - err := converter.ConsumeMetricsData(context.Background(), ocMetricData) - assert.NoError(t, err) - err = converter.ConsumeMetricsData(context.Background(), ocMetricData) - assert.NoError(t, err) - - ocMetrics := metricsExporter.AllMetrics() - assert.Equal(t, len(ocMetrics), 2) - assert.EqualValues(t, pdatautil.MetricsToInternalMetrics(ocMetrics[0]), md) - assert.EqualValues(t, pdatautil.MetricsToInternalMetrics(ocMetrics[1]), md) - - metricsExporter.SetConsumeMetricsError(fmt.Errorf("consumer error")) - err = converter.ConsumeMetricsData(context.Background(), ocMetricData) - assert.NotNil(t, err) -} diff --git a/exporter/exportertest/nop_exporter.go b/exporter/exportertest/nop_exporter.go index 02eea8c70df6..fbc8f517e76a 100644 --- a/exporter/exportertest/nop_exporter.go +++ b/exporter/exportertest/nop_exporter.go @@ -53,7 +53,7 @@ func (ne *nopExporter) Shutdown(context.Context) error { return nil } -// NewNopTraceExporterOld creates an TraceExporter that just drops the received data. +// NewNopTraceExporter creates an TraceExporter that just drops the received data. func NewNopTraceExporter() component.TraceExporter { ne := &nopExporter{ name: nopTraceExporterName, @@ -61,7 +61,7 @@ func NewNopTraceExporter() component.TraceExporter { return ne } -// NewNopMetricsExporterOld creates an MetricsExporter that just drops the received data. +// NewNopMetricsExporter creates an MetricsExporter that just drops the received data. func NewNopMetricsExporter() component.MetricsExporter { ne := &nopExporter{ name: nopMetricsExporterName, @@ -69,7 +69,7 @@ func NewNopMetricsExporter() component.MetricsExporter { return ne } -// NewNopLogsExporterOld creates an LogsExporter that just drops the received data. +// NewNopLogsExporter creates an LogsExporter that just drops the received data. func NewNopLogsExporter() component.LogsExporter { ne := &nopExporter{ name: nopLogsExporterName, diff --git a/exporter/exportertest/sink_exporter.go b/exporter/exportertest/sink_exporter.go index 6cde9db8d8aa..d73308f0eb77 100644 --- a/exporter/exportertest/sink_exporter.go +++ b/exporter/exportertest/sink_exporter.go @@ -20,60 +20,10 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/consumer/consumerdata" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/consumer/pdatautil" ) -// SinkTraceExporterOld acts as a trace receiver for use in tests. -type SinkTraceExporterOld struct { - consumeTraceError error // to be returned by ConsumeTraceData, if set - mu sync.Mutex - traces []consumerdata.TraceData -} - -// Start tells the exporter to start. The exporter may prepare for exporting -// by connecting to the endpoint. Host parameter can be used for communicating -// with the host after Start() has already returned. -func (ste *SinkTraceExporterOld) Start(context.Context, component.Host) error { - return nil -} - -// ConsumeTraceData stores traces for tests. -func (ste *SinkTraceExporterOld) ConsumeTraceData(_ context.Context, td consumerdata.TraceData) error { - ste.mu.Lock() - defer ste.mu.Unlock() - - if ste.consumeTraceError != nil { - return ste.consumeTraceError - } - - ste.traces = append(ste.traces, td) - - return nil -} - -// AllTraces returns the traces sent to the test sink. -func (ste *SinkTraceExporterOld) AllTraces() []consumerdata.TraceData { - ste.mu.Lock() - defer ste.mu.Unlock() - - return ste.traces -} - -// SetConsumeTraceError sets an error that will be returned by ConsumeTraceData -func (ste *SinkTraceExporterOld) SetConsumeTraceError(err error) { - ste.mu.Lock() - defer ste.mu.Unlock() - - ste.consumeTraceError = err -} - -// Shutdown stops the exporter and is invoked during shutdown. -func (ste *SinkTraceExporterOld) Shutdown(context.Context) error { - return nil -} - // SinkTraceExporter acts as a trace receiver for use in tests. type SinkTraceExporter struct { mu sync.Mutex @@ -142,55 +92,6 @@ func (ste *SinkTraceExporter) Shutdown(context.Context) error { return nil } -// SinkMetricsExporterOld acts as a metrics receiver for use in tests. -type SinkMetricsExporterOld struct { - mu sync.Mutex - consumeMetricsError error // to be returned by ConsumeMetricsData, if set - metrics []consumerdata.MetricsData -} - -// Start tells the exporter to start. The exporter may prepare for exporting -// by connecting to the endpoint. Host parameter can be used for communicating -// with the host after Start() has already returned. -func (sme *SinkMetricsExporterOld) Start(context.Context, component.Host) error { - return nil -} - -// ConsumeMetricsData stores traces for tests. -func (sme *SinkMetricsExporterOld) ConsumeMetricsData(_ context.Context, md consumerdata.MetricsData) error { - sme.mu.Lock() - defer sme.mu.Unlock() - - if sme.consumeMetricsError != nil { - return sme.consumeMetricsError - } - - sme.metrics = append(sme.metrics, md) - - return nil -} - -// AllMetrics returns the metrics sent to the test sink. -func (sme *SinkMetricsExporterOld) AllMetrics() []consumerdata.MetricsData { - sme.mu.Lock() - defer sme.mu.Unlock() - - return sme.metrics -} - -// SetConsumeMetricsError sets an error that will be returned by ConsumeMetricsData -func (sme *SinkMetricsExporterOld) SetConsumeMetricsError(err error) { - sme.mu.Lock() - defer sme.mu.Unlock() - - sme.consumeMetricsError = err -} - -// Shutdown stops the exporter and is invoked during shutdown. -func (sme *SinkMetricsExporterOld) Shutdown(context.Context) error { - return nil -} - // SinkMetricsExporter acts as a metrics receiver for use in tests. type SinkMetricsExporter struct { mu sync.Mutex diff --git a/exporter/exportertest/sink_exporter_test.go b/exporter/exportertest/sink_exporter_test.go index aa15a3c7e8a4..127c2f7490f2 100644 --- a/exporter/exportertest/sink_exporter_test.go +++ b/exporter/exportertest/sink_exporter_test.go @@ -18,45 +18,15 @@ import ( "errors" "testing" - metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" - tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/consumer/consumerdata" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/consumer/pdatautil" "go.opentelemetry.io/collector/internal/data/testdata" ) -func TestSinkTraceExporterOld(t *testing.T) { - sink := new(SinkTraceExporterOld) - require.NoError(t, sink.Start(context.Background(), componenttest.NewNopHost())) - td := consumerdata.TraceData{ - Spans: make([]*tracepb.Span, 7), - } - want := make([]consumerdata.TraceData, 0, 7) - for i := 0; i < 7; i++ { - require.NoError(t, sink.ConsumeTraceData(context.Background(), td)) - want = append(want, td) - } - assert.Equal(t, want, sink.AllTraces()) - require.NoError(t, sink.Shutdown(context.Background())) -} - -func TestSinkTraceExporterOld_Error(t *testing.T) { - sink := new(SinkTraceExporterOld) - require.NoError(t, sink.Start(context.Background(), componenttest.NewNopHost())) - sink.SetConsumeTraceError(errors.New("my error")) - td := consumerdata.TraceData{ - Spans: make([]*tracepb.Span, 7), - } - require.Error(t, sink.ConsumeTraceData(context.Background(), td)) - assert.Len(t, sink.AllTraces(), 0) - require.NoError(t, sink.Shutdown(context.Background())) -} - func TestSinkTraceExporter(t *testing.T) { sink := new(SinkTraceExporter) require.NoError(t, sink.Start(context.Background(), componenttest.NewNopHost())) @@ -85,33 +55,6 @@ func TestSinkTraceExporter_Error(t *testing.T) { require.NoError(t, sink.Shutdown(context.Background())) } -func TestSinkMetricsExporterOld(t *testing.T) { - sink := new(SinkMetricsExporterOld) - require.NoError(t, sink.Start(context.Background(), componenttest.NewNopHost())) - md := consumerdata.MetricsData{ - Metrics: make([]*metricspb.Metric, 7), - } - want := make([]consumerdata.MetricsData, 0, 7) - for i := 0; i < 7; i++ { - require.NoError(t, sink.ConsumeMetricsData(context.Background(), md)) - want = append(want, md) - } - assert.Equal(t, want, sink.AllMetrics()) - require.NoError(t, sink.Shutdown(context.Background())) -} - -func TestSinkMetricsExporterOld_Error(t *testing.T) { - sink := new(SinkMetricsExporterOld) - require.NoError(t, sink.Start(context.Background(), componenttest.NewNopHost())) - sink.SetConsumeMetricsError(errors.New("my error")) - md := consumerdata.MetricsData{ - Metrics: make([]*metricspb.Metric, 7), - } - require.Error(t, sink.ConsumeMetricsData(context.Background(), md)) - assert.Len(t, sink.AllMetrics(), 0) - require.NoError(t, sink.Shutdown(context.Background())) -} - func TestSinkMetricsExporter(t *testing.T) { sink := new(SinkMetricsExporter) require.NoError(t, sink.Start(context.Background(), componenttest.NewNopHost())) diff --git a/processor/cloningfanoutconnector.go b/processor/cloningfanoutconnector.go index 7021e796dfab..ac45c7d97313 100644 --- a/processor/cloningfanoutconnector.go +++ b/processor/cloningfanoutconnector.go @@ -17,15 +17,8 @@ package processor import ( "context" - commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" - resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" - tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" - "google.golang.org/protobuf/proto" - "go.opentelemetry.io/collector/component/componenterror" "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/consumer/consumerdata" - "go.opentelemetry.io/collector/consumer/converter" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/consumer/pdatautil" ) @@ -35,65 +28,13 @@ import ( // clones of data before fanning out, which ensures each consumer gets their // own copy of data and is free to modify it. -// CreateMetricsCloningFanOutConnector is a placeholder function for now. -// It supposed to create an old type connector or a new type connector based on type of provided metrics consumer. -func CreateMetricsCloningFanOutConnector(mcs []consumer.MetricsConsumerBase) consumer.MetricsConsumerBase { +// NewMetricsCloningFanOutConnector wraps multiple new type metrics consumers in a single one and clones the data +// before fanning out. +func NewMetricsCloningFanOutConnector(mcs []consumer.MetricsConsumer) consumer.MetricsConsumer { if len(mcs) == 1 { // Don't wrap if no need to do it. return mcs[0] } - metricsConsumersOld := make([]consumer.MetricsConsumerOld, 0, len(mcs)) - metricsConsumers := make([]consumer.MetricsConsumer, 0, len(mcs)) - allMetricsConsumersOld := true - for _, mc := range mcs { - if metricsConsumer, ok := mc.(consumer.MetricsConsumer); ok { - allMetricsConsumersOld = false - metricsConsumers = append(metricsConsumers, metricsConsumer) - } else { - metricsConsumerOld := mc.(consumer.MetricsConsumerOld) - metricsConsumersOld = append(metricsConsumersOld, metricsConsumerOld) - metricsConsumers = append(metricsConsumers, converter.NewInternalToOCMetricsConverter(metricsConsumerOld)) - } - } - - if allMetricsConsumersOld { - return newMetricsCloningFanOutConnectorOld(metricsConsumersOld) - } - return newMetricsCloningFanOutConnector(metricsConsumers) -} - -func newMetricsCloningFanOutConnectorOld(mcs []consumer.MetricsConsumerOld) consumer.MetricsConsumerOld { - return metricsCloningFanOutConnectorOld(mcs) -} - -type metricsCloningFanOutConnectorOld []consumer.MetricsConsumerOld - -var _ consumer.MetricsConsumerOld = (*metricsCloningFanOutConnectorOld)(nil) - -// ConsumeMetricsData exports the MetricsData to all consumers wrapped by the current one. -func (mfc metricsCloningFanOutConnectorOld) ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error { - var errs []error - - // Fan out to first len-1 consumers. - for i := 0; i < len(mfc)-1; i++ { - // Create a clone of data. We need to clone because consumers may modify the data. - if err := mfc[i].ConsumeMetricsData(ctx, pdatautil.CloneMetricsDataOld(md)); err != nil { - errs = append(errs, err) - } - } - - if len(mfc) > 0 { - // Give the original data to the last consumer. - lastTc := mfc[len(mfc)-1] - if err := lastTc.ConsumeMetricsData(ctx, md); err != nil { - errs = append(errs, err) - } - } - - return componenterror.CombineErrors(errs) -} - -func newMetricsCloningFanOutConnector(mcs []consumer.MetricsConsumer) consumer.MetricsConsumer { return metricsCloningFanOutConnector(mcs) } @@ -125,74 +66,22 @@ func (mfc metricsCloningFanOutConnector) ConsumeMetrics(ctx context.Context, md return componenterror.CombineErrors(errs) } -// CreateTraceCloningFanOutConnector is a placeholder function for now. -// It supposed to create an old type connector or a new type connector based on type of provided trace consumer. -func CreateTraceCloningFanOutConnector(tcs []consumer.TraceConsumerBase) consumer.TraceConsumerBase { +// NewTracesCloningFanOutConnector wraps multiple new type traces consumers in a single one and clones the data +// before fanning out. +func NewTracesCloningFanOutConnector(tcs []consumer.TraceConsumer) consumer.TraceConsumer { if len(tcs) == 1 { // Don't wrap if no need to do it. return tcs[0] } - traceConsumersOld := make([]consumer.TraceConsumerOld, 0, len(tcs)) - traceConsumers := make([]consumer.TraceConsumer, 0, len(tcs)) - allTraceConsumersOld := true - for _, tc := range tcs { - if traceConsumer, ok := tc.(consumer.TraceConsumer); ok { - allTraceConsumersOld = false - traceConsumers = append(traceConsumers, traceConsumer) - } else { - traceConsumerOld := tc.(consumer.TraceConsumerOld) - traceConsumersOld = append(traceConsumersOld, traceConsumerOld) - traceConsumers = append(traceConsumers, converter.NewInternalToOCTraceConverter(traceConsumerOld)) - } - } - - if allTraceConsumersOld { - return newTraceCloningFanOutConnectorOld(traceConsumersOld) - } - return newTraceCloningFanOutConnector(traceConsumers) -} - -func newTraceCloningFanOutConnectorOld(tcs []consumer.TraceConsumerOld) consumer.TraceConsumerOld { - return traceCloningFanOutConnectorOld(tcs) -} - -type traceCloningFanOutConnectorOld []consumer.TraceConsumerOld - -var _ consumer.TraceConsumerOld = (*traceCloningFanOutConnectorOld)(nil) - -// ConsumeTraceData exports the span data to all trace consumers wrapped by the current one. -func (tfc traceCloningFanOutConnectorOld) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error { - var errs []error - - // Fan out to first len-1 consumers. - for i := 0; i < len(tfc)-1; i++ { - // Create a clone of data. We need to clone because consumers may modify the data. - if err := tfc[i].ConsumeTraceData(ctx, cloneTraceDataOld(td)); err != nil { - errs = append(errs, err) - } - } - - if len(tfc) > 0 { - // Give the original data to the last consumer. - lastTc := tfc[len(tfc)-1] - if err := lastTc.ConsumeTraceData(ctx, td); err != nil { - errs = append(errs, err) - } - } - - return componenterror.CombineErrors(errs) + return tracesCloningFanOutConnector(tcs) } -func newTraceCloningFanOutConnector(tcs []consumer.TraceConsumer) consumer.TraceConsumer { - return traceCloningFanOutConnector(tcs) -} - -type traceCloningFanOutConnector []consumer.TraceConsumer +type tracesCloningFanOutConnector []consumer.TraceConsumer -var _ consumer.TraceConsumer = (*traceCloningFanOutConnector)(nil) +var _ consumer.TraceConsumer = (*tracesCloningFanOutConnector)(nil) // ConsumeTraceData exports the span data to all trace consumers wrapped by the current one. -func (tfc traceCloningFanOutConnector) ConsumeTraces(ctx context.Context, td pdata.Traces) error { +func (tfc tracesCloningFanOutConnector) ConsumeTraces(ctx context.Context, td pdata.Traces) error { var errs []error // Fan out to first len-1 consumers. @@ -215,17 +104,21 @@ func (tfc traceCloningFanOutConnector) ConsumeTraces(ctx context.Context, td pda return componenterror.CombineErrors(errs) } -// NewLogCloningFanOutConnector wraps multiple trace consumers in a single one. -func NewLogCloningFanOutConnector(lcs []consumer.LogsConsumer) consumer.LogsConsumer { - return LogCloningFanOutConnector(lcs) +// NewLogsCloningFanOutConnector wraps multiple trace consumers in a single one. +func NewLogsCloningFanOutConnector(lcs []consumer.LogsConsumer) consumer.LogsConsumer { + if len(lcs) == 1 { + // Don't wrap if no need to do it. + return lcs[0] + } + return logsCloningFanOutConnector(lcs) } -type LogCloningFanOutConnector []consumer.LogsConsumer +type logsCloningFanOutConnector []consumer.LogsConsumer -var _ consumer.LogsConsumer = (*LogCloningFanOutConnector)(nil) +var _ consumer.LogsConsumer = (*logsCloningFanOutConnector)(nil) // ConsumeLogs exports the span data to all consumers wrapped by the current one. -func (lfc LogCloningFanOutConnector) ConsumeLogs(ctx context.Context, ld pdata.Logs) error { +func (lfc logsCloningFanOutConnector) ConsumeLogs(ctx context.Context, ld pdata.Logs) error { var errs []error // Fan out to first len-1 consumers. @@ -247,22 +140,3 @@ func (lfc LogCloningFanOutConnector) ConsumeLogs(ctx context.Context, ld pdata.L return componenterror.CombineErrors(errs) } - -func cloneTraceDataOld(td consumerdata.TraceData) consumerdata.TraceData { - clone := consumerdata.TraceData{ - SourceFormat: td.SourceFormat, - Node: proto.Clone(td.Node).(*commonpb.Node), - Resource: proto.Clone(td.Resource).(*resourcepb.Resource), - } - - if td.Spans != nil { - clone.Spans = make([]*tracepb.Span, 0, len(td.Spans)) - - for _, span := range td.Spans { - spanClone := proto.Clone(span).(*tracepb.Span) - clone.Spans = append(clone.Spans, spanClone) - } - } - - return clone -} diff --git a/processor/cloningfanoutconnector_test.go b/processor/cloningfanoutconnector_test.go index cfeb5900621d..63fa654b14a1 100644 --- a/processor/cloningfanoutconnector_test.go +++ b/processor/cloningfanoutconnector_test.go @@ -16,150 +16,22 @@ package processor import ( "context" - "math/rand" - "strconv" "testing" - commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" - metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" - resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" - tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" "github.com/stretchr/testify/assert" - "google.golang.org/protobuf/proto" "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/consumer/consumerdata" - "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/consumer/pdatautil" + "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/internal/data/testdata" - "go.opentelemetry.io/collector/translator/conventions" ) -func TestTraceProcessorCloningMultiplexingOld(t *testing.T) { - processors := make([]consumer.TraceConsumerOld, 3) - for i := range processors { - processors[i] = &mockTraceConsumerOld{} - } - - tfc := newTraceCloningFanOutConnectorOld(processors) - td := consumerdata.TraceData{ - Spans: make([]*tracepb.Span, 7), - Resource: &resourcepb.Resource{ - Type: "testtype", - }, - } - - var wantSpansCount = 0 - for i := 0; i < 2; i++ { - wantSpansCount += len(td.Spans) - err := tfc.ConsumeTraceData(context.Background(), td) - if err != nil { - t.Errorf("Wanted nil got error") - return - } - } - - for i, p := range processors { - m := p.(*mockTraceConsumerOld) - assert.Equal(t, wantSpansCount, m.TotalSpans) - if i < len(processors)-1 { - assert.True(t, td.Resource != m.Traces[0].Resource) - } else { - assert.True(t, td.Resource == m.Traces[0].Resource) - } - assert.True(t, proto.Equal(td.Resource, m.Traces[0].Resource)) - } -} - -func TestMetricsProcessorCloningMultiplexingOld(t *testing.T) { - processors := make([]consumer.MetricsConsumerOld, 3) - for i := range processors { - processors[i] = &mockMetricsConsumerOld{} - } - - mfc := newMetricsCloningFanOutConnectorOld(processors) - md := consumerdata.MetricsData{ - Metrics: make([]*metricspb.Metric, 7), - Resource: &resourcepb.Resource{ - Type: "testtype", - }, - } - - var wantMetricsCount = 0 - for i := 0; i < 2; i++ { - wantMetricsCount += len(md.Metrics) - err := mfc.ConsumeMetricsData(context.Background(), md) - if err != nil { - t.Errorf("Wanted nil got error") - return - } - } - - for i, p := range processors { - m := p.(*mockMetricsConsumerOld) - assert.Equal(t, wantMetricsCount, m.TotalMetrics) - if i < len(processors)-1 { - assert.True(t, md.Resource != m.Metrics[0].Resource) - } else { - assert.True(t, md.Resource == m.Metrics[0].Resource) - } - assert.True(t, proto.Equal(md.Resource, m.Metrics[0].Resource)) - } -} - -func Benchmark100SpanCloneOld(b *testing.B) { - - b.StopTimer() - - name := tracepb.TruncatableString{Value: "testspanname"} - traceData := consumerdata.TraceData{ - SourceFormat: "test-source-format", - Node: &commonpb.Node{ - ServiceInfo: &commonpb.ServiceInfo{ - Name: "servicename", - }, - }, - Resource: &resourcepb.Resource{ - Type: "resourcetype", - }, - } - for i := 0; i < 100; i++ { - span := &tracepb.Span{ - TraceId: genRandBytes(16), - SpanId: genRandBytes(8), - Name: &name, - Attributes: &tracepb.Span_Attributes{ - AttributeMap: map[string]*tracepb.AttributeValue{}, - }, - } - - for j := 0; j < 5; j++ { - span.Attributes.AttributeMap["intattr"+strconv.Itoa(j)] = &tracepb.AttributeValue{ - Value: &tracepb.AttributeValue_IntValue{IntValue: int64(i)}, - } - span.Attributes.AttributeMap["strattr"+strconv.Itoa(j)] = &tracepb.AttributeValue{ - Value: &tracepb.AttributeValue_StringValue{ - StringValue: &tracepb.TruncatableString{Value: string(genRandBytes(20))}, - }, - } - } - - traceData.Spans = append(traceData.Spans, span) - } - - b.StartTimer() - - for i := 0; i < b.N; i++ { - cloneTraceDataOld(traceData) - } -} - func TestTraceProcessorCloningNotMultiplexing(t *testing.T) { - processors := []consumer.TraceConsumerBase{ - &mockTraceConsumer{}, + processors := []consumer.TraceConsumer{ + new(exportertest.SinkTraceExporter), } - tfc := CreateTraceCloningFanOutConnector(processors) + tfc := NewTracesCloningFanOutConnector(processors) assert.Same(t, processors[0], tfc) } @@ -167,10 +39,10 @@ func TestTraceProcessorCloningNotMultiplexing(t *testing.T) { func TestTraceProcessorCloningMultiplexing(t *testing.T) { processors := make([]consumer.TraceConsumer, 3) for i := range processors { - processors[i] = &mockTraceConsumer{} + processors[i] = new(exportertest.SinkTraceExporter) } - tfc := newTraceCloningFanOutConnector(processors) + tfc := NewTracesCloningFanOutConnector(processors) td := testdata.GenerateTraceDataTwoSpansSameResource() var wantSpansCount = 0 @@ -184,28 +56,29 @@ func TestTraceProcessorCloningMultiplexing(t *testing.T) { } for i, p := range processors { - m := p.(*mockTraceConsumer) - assert.Equal(t, wantSpansCount, m.TotalSpans) + m := p.(*exportertest.SinkTraceExporter) + assert.Equal(t, wantSpansCount, m.SpansCount()) spanOrig := td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0) - spanClone := m.Traces[0].ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0) + allTraces := m.AllTraces() + spanClone := allTraces[0].ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0) if i < len(processors)-1 { - assert.True(t, td.ResourceSpans().At(0).Resource() != m.Traces[0].ResourceSpans().At(0).Resource()) + assert.True(t, td.ResourceSpans().At(0).Resource() != allTraces[0].ResourceSpans().At(0).Resource()) assert.True(t, spanOrig != spanClone) } else { - assert.True(t, td.ResourceSpans().At(0).Resource() == m.Traces[0].ResourceSpans().At(0).Resource()) + assert.True(t, td.ResourceSpans().At(0).Resource() == allTraces[0].ResourceSpans().At(0).Resource()) assert.True(t, spanOrig == spanClone) } - assert.EqualValues(t, td.ResourceSpans().At(0).Resource(), m.Traces[0].ResourceSpans().At(0).Resource()) + assert.EqualValues(t, td.ResourceSpans().At(0).Resource(), allTraces[0].ResourceSpans().At(0).Resource()) assert.EqualValues(t, spanOrig, spanClone) } } func TestMetricsProcessorCloningNotMultiplexing(t *testing.T) { - processors := []consumer.MetricsConsumerBase{ - &mockMetricsConsumer{}, + processors := []consumer.MetricsConsumer{ + new(exportertest.SinkMetricsExporter), } - tfc := CreateMetricsCloningFanOutConnector(processors) + tfc := NewMetricsCloningFanOutConnector(processors) assert.Same(t, processors[0], tfc) } @@ -213,10 +86,10 @@ func TestMetricsProcessorCloningNotMultiplexing(t *testing.T) { func TestMetricsProcessorCloningMultiplexing(t *testing.T) { processors := make([]consumer.MetricsConsumer, 3) for i := range processors { - processors[i] = &mockMetricsConsumer{} + processors[i] = new(exportertest.SinkMetricsExporter) } - mfc := newMetricsCloningFanOutConnector(processors) + mfc := NewMetricsCloningFanOutConnector(processors) md := testdata.GenerateMetricDataWithCountersHistogramAndSummary() var wantMetricsCount = 0 @@ -230,133 +103,19 @@ func TestMetricsProcessorCloningMultiplexing(t *testing.T) { } for i, p := range processors { - m := p.(*mockMetricsConsumer) - assert.Equal(t, wantMetricsCount, m.TotalMetrics) + m := p.(*exportertest.SinkMetricsExporter) + assert.Equal(t, wantMetricsCount, m.MetricsCount()) metricOrig := md.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0) - metricClone := pdatautil.MetricsToInternalMetrics(*m.Metrics[0]).ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0) + allMetrics := m.AllMetrics() + metricClone := pdatautil.MetricsToInternalMetrics(allMetrics[0]).ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0) if i < len(processors)-1 { - assert.True(t, md.ResourceMetrics().At(0).Resource() != pdatautil.MetricsToInternalMetrics(*m.Metrics[0]).ResourceMetrics().At(0).Resource()) + assert.True(t, md.ResourceMetrics().At(0).Resource() != pdatautil.MetricsToInternalMetrics(allMetrics[0]).ResourceMetrics().At(0).Resource()) assert.True(t, metricOrig != metricClone) } else { - assert.True(t, md.ResourceMetrics().At(0).Resource() == pdatautil.MetricsToInternalMetrics(*m.Metrics[0]).ResourceMetrics().At(0).Resource()) + assert.True(t, md.ResourceMetrics().At(0).Resource() == pdatautil.MetricsToInternalMetrics(allMetrics[0]).ResourceMetrics().At(0).Resource()) assert.True(t, metricOrig == metricClone) } - assert.EqualValues(t, md.ResourceMetrics().At(0).Resource(), pdatautil.MetricsToInternalMetrics(*m.Metrics[0]).ResourceMetrics().At(0).Resource()) + assert.EqualValues(t, md.ResourceMetrics().At(0).Resource(), pdatautil.MetricsToInternalMetrics(allMetrics[0]).ResourceMetrics().At(0).Resource()) assert.EqualValues(t, metricOrig, metricClone) } } - -func Benchmark100SpanClone(b *testing.B) { - - b.StopTimer() - - name := tracepb.TruncatableString{Value: "testspanname"} - traceData := consumerdata.TraceData{ - SourceFormat: "test-source-format", - Node: &commonpb.Node{ - ServiceInfo: &commonpb.ServiceInfo{ - Name: "servicename", - }, - }, - Resource: &resourcepb.Resource{ - Type: "resourcetype", - }, - } - for i := 0; i < 100; i++ { - span := &tracepb.Span{ - TraceId: genRandBytes(16), - SpanId: genRandBytes(8), - Name: &name, - Attributes: &tracepb.Span_Attributes{ - AttributeMap: map[string]*tracepb.AttributeValue{}, - }, - } - - for j := 0; j < 5; j++ { - span.Attributes.AttributeMap["intattr"+strconv.Itoa(j)] = &tracepb.AttributeValue{ - Value: &tracepb.AttributeValue_IntValue{IntValue: int64(i)}, - } - span.Attributes.AttributeMap["strattr"+strconv.Itoa(j)] = &tracepb.AttributeValue{ - Value: &tracepb.AttributeValue_StringValue{ - StringValue: &tracepb.TruncatableString{Value: string(genRandBytes(20))}, - }, - } - } - - traceData.Spans = append(traceData.Spans, span) - } - - b.StartTimer() - - for i := 0; i < b.N; i++ { - cloneTraceDataOld(traceData) - } -} - -func TestCreateTraceCloningFanOutConnectorWithConvertion(t *testing.T) { - traceConsumerOld := &mockTraceConsumerOld{} - traceConsumer := &mockTraceConsumer{} - processors := []consumer.TraceConsumerBase{ - traceConsumerOld, - traceConsumer, - } - - resourceTypeName := "good-resource" - - td := testdata.GenerateTraceDataTwoSpansSameResource() - resource := td.ResourceSpans().At(0).Resource() - resource.Attributes().Upsert(conventions.OCAttributeResourceType, pdata.NewAttributeValueString(resourceTypeName)) - - tfc := CreateTraceCloningFanOutConnector(processors).(consumer.TraceConsumer) - - var wantSpansCount = 0 - for i := 0; i < 2; i++ { - wantSpansCount += td.SpanCount() - err := tfc.ConsumeTraces(context.Background(), td) - assert.NoError(t, err) - } - - assert.Equal(t, wantSpansCount, traceConsumerOld.TotalSpans) - assert.Equal(t, resourceTypeName, traceConsumerOld.Traces[0].Resource.Type) - - assert.Equal(t, wantSpansCount, traceConsumer.TotalSpans) - assert.Equal(t, resource, traceConsumer.Traces[0].ResourceSpans().At(0).Resource()) -} - -func TestCreateMetricsCloningFanOutConnectorWithConvertion(t *testing.T) { - metricsConsumerOld := &mockMetricsConsumerOld{} - metricsConsumer := &mockMetricsConsumer{} - processors := []consumer.MetricsConsumerBase{ - metricsConsumerOld, - metricsConsumer, - } - - resourceTypeName := "good-resource" - - md := testdata.GenerateMetricDataWithCountersHistogramAndSummary() - resource := md.ResourceMetrics().At(0).Resource() - resource.Attributes().Upsert(conventions.OCAttributeResourceType, pdata.NewAttributeValueString(resourceTypeName)) - - mfc := CreateMetricsCloningFanOutConnector(processors).(consumer.MetricsConsumer) - - var wantSpansCount = 0 - for i := 0; i < 2; i++ { - wantSpansCount += md.MetricCount() - err := mfc.ConsumeMetrics(context.Background(), pdatautil.MetricsFromInternalMetrics(md)) - assert.NoError(t, err) - } - - assert.Equal(t, wantSpansCount, metricsConsumerOld.TotalMetrics) - assert.Equal(t, resourceTypeName, metricsConsumerOld.Metrics[0].Resource.Type) - - assert.Equal(t, wantSpansCount, metricsConsumer.TotalMetrics) - assert.Equal(t, resource, pdatautil.MetricsToInternalMetrics(*metricsConsumer.Metrics[0]).ResourceMetrics().At(0).Resource()) -} - -func genRandBytes(len int) []byte { - b := make([]byte, len) - for i := range b { - b[i] = byte(rand.Intn(256)) - } - return b -} diff --git a/processor/fanoutconnector.go b/processor/fanoutconnector.go index a6a886a33a4a..438ab8195c01 100644 --- a/processor/fanoutconnector.go +++ b/processor/fanoutconnector.go @@ -19,60 +19,18 @@ import ( "go.opentelemetry.io/collector/component/componenterror" "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/consumer/consumerdata" - "go.opentelemetry.io/collector/consumer/converter" "go.opentelemetry.io/collector/consumer/pdata" ) // This file contains implementations of Trace/Metrics connectors // that fan out the data to multiple other consumers. -// CreateMetricsFanOutConnector creates a connector based on provided type of trace consumer. -// If any of the wrapped metrics consumers are of the new type, use metricsFanOutConnector, -// otherwise use the old type connector. -func CreateMetricsFanOutConnector(mcs []consumer.MetricsConsumerBase) consumer.MetricsConsumerBase { - metricsConsumersOld := make([]consumer.MetricsConsumerOld, 0, len(mcs)) - metricsConsumers := make([]consumer.MetricsConsumer, 0, len(mcs)) - allMetricsConsumersOld := true - for _, mc := range mcs { - if metricsConsumer, ok := mc.(consumer.MetricsConsumer); ok { - allMetricsConsumersOld = false - metricsConsumers = append(metricsConsumers, metricsConsumer) - } else { - metricsConsumerOld := mc.(consumer.MetricsConsumerOld) - metricsConsumersOld = append(metricsConsumersOld, metricsConsumerOld) - metricsConsumers = append(metricsConsumers, converter.NewInternalToOCMetricsConverter(metricsConsumerOld)) - } - } - - if allMetricsConsumersOld { - return NewMetricsFanOutConnectorOld(metricsConsumersOld) - } - return NewMetricsFanOutConnector(metricsConsumers) -} - -// NewMetricsFanOutConnectorOld wraps multiple metrics consumers in a single one. -func NewMetricsFanOutConnectorOld(mcs []consumer.MetricsConsumerOld) consumer.MetricsConsumerOld { - return metricsFanOutConnectorOld(mcs) -} - -type metricsFanOutConnectorOld []consumer.MetricsConsumerOld - -var _ consumer.MetricsConsumerOld = (*metricsFanOutConnectorOld)(nil) - -// ConsumeMetricsData exports the MetricsData to all consumers wrapped by the current one. -func (mfc metricsFanOutConnectorOld) ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error { - var errs []error - for _, mc := range mfc { - if err := mc.ConsumeMetricsData(ctx, md); err != nil { - errs = append(errs, err) - } - } - return componenterror.CombineErrors(errs) -} - // NewMetricsFanOutConnector wraps multiple new type metrics consumers in a single one. func NewMetricsFanOutConnector(mcs []consumer.MetricsConsumer) consumer.MetricsConsumer { + if len(mcs) == 1 { + // Don't wrap if no need to do it. + return mcs[0] + } return metricsFanOutConnector(mcs) } @@ -91,52 +49,12 @@ func (mfc metricsFanOutConnector) ConsumeMetrics(ctx context.Context, md pdata.M return componenterror.CombineErrors(errs) } -// CreateTraceFanOutConnector wraps multiple trace consumers in a single one. -// If any of the wrapped trace consumers are of the new type, use traceFanOutConnector, -// otherwise use the old type connector -func CreateTraceFanOutConnector(tcs []consumer.TraceConsumerBase) consumer.TraceConsumerBase { - traceConsumersOld := make([]consumer.TraceConsumerOld, 0, len(tcs)) - traceConsumers := make([]consumer.TraceConsumer, 0, len(tcs)) - allTraceConsumersOld := true - for _, tc := range tcs { - if traceConsumer, ok := tc.(consumer.TraceConsumer); ok { - allTraceConsumersOld = false - traceConsumers = append(traceConsumers, traceConsumer) - } else { - traceConsumerOld := tc.(consumer.TraceConsumerOld) - traceConsumersOld = append(traceConsumersOld, traceConsumerOld) - traceConsumers = append(traceConsumers, converter.NewInternalToOCTraceConverter(traceConsumerOld)) - } +// NewTracesFanOutConnector wraps multiple new type trace consumers in a single one. +func NewTracesFanOutConnector(tcs []consumer.TraceConsumer) consumer.TraceConsumer { + if len(tcs) == 1 { + // Don't wrap if no need to do it. + return tcs[0] } - - if allTraceConsumersOld { - return NewTraceFanOutConnectorOld(traceConsumersOld) - } - return NewTraceFanOutConnector(traceConsumers) -} - -// NewTraceFanOutConnectorOld wraps multiple trace consumers in a single one. -func NewTraceFanOutConnectorOld(tcs []consumer.TraceConsumerOld) consumer.TraceConsumerOld { - return traceFanOutConnectorOld(tcs) -} - -type traceFanOutConnectorOld []consumer.TraceConsumerOld - -var _ consumer.TraceConsumerOld = (*traceFanOutConnectorOld)(nil) - -// ConsumeTraceData exports the span data to all trace consumers wrapped by the current one. -func (tfc traceFanOutConnectorOld) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error { - var errs []error - for _, tc := range tfc { - if err := tc.ConsumeTraceData(ctx, td); err != nil { - errs = append(errs, err) - } - } - return componenterror.CombineErrors(errs) -} - -// NewTraceFanOutConnector wraps multiple new type trace consumers in a single one. -func NewTraceFanOutConnector(tcs []consumer.TraceConsumer) consumer.TraceConsumer { return traceFanOutConnector(tcs) } @@ -155,17 +73,21 @@ func (tfc traceFanOutConnector) ConsumeTraces(ctx context.Context, td pdata.Trac return componenterror.CombineErrors(errs) } -// NewLogFanOutConnector wraps multiple new type consumers in a single one. -func NewLogFanOutConnector(lcs []consumer.LogsConsumer) consumer.LogsConsumer { - return LogFanOutConnector(lcs) +// NewLogsFanOutConnector wraps multiple new type consumers in a single one. +func NewLogsFanOutConnector(lcs []consumer.LogsConsumer) consumer.LogsConsumer { + if len(lcs) == 1 { + // Don't wrap if no need to do it. + return lcs[0] + } + return logsFanOutConnector(lcs) } -type LogFanOutConnector []consumer.LogsConsumer +type logsFanOutConnector []consumer.LogsConsumer -var _ consumer.LogsConsumer = (*LogFanOutConnector)(nil) +var _ consumer.LogsConsumer = (*logsFanOutConnector)(nil) // Consume exports the span data to all consumers wrapped by the current one. -func (fc LogFanOutConnector) ConsumeLogs(ctx context.Context, ld pdata.Logs) error { +func (fc logsFanOutConnector) ConsumeLogs(ctx context.Context, ld pdata.Logs) error { var errs []error for _, tc := range fc { if err := tc.ConsumeLogs(ctx, ld); err != nil { diff --git a/processor/fanoutconnector_test.go b/processor/fanoutconnector_test.go index 1da7ec8bd007..7d5032d44e8c 100644 --- a/processor/fanoutconnector_test.go +++ b/processor/fanoutconnector_test.go @@ -16,40 +16,30 @@ package processor import ( "context" - "fmt" + "errors" "testing" - metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" - resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" - tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/consumer/consumerdata" - "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/consumer/pdatautil" - "go.opentelemetry.io/collector/internal/data" - "go.opentelemetry.io/collector/translator/conventions" + "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/internal/data/testdata" ) func TestTraceProcessorMultiplexing(t *testing.T) { - processors := make([]consumer.TraceConsumerOld, 3) + processors := make([]consumer.TraceConsumer, 3) for i := range processors { - processors[i] = &mockTraceConsumerOld{} + processors[i] = new(exportertest.SinkTraceExporter) } - tfc := NewTraceFanOutConnectorOld(processors) - td := consumerdata.TraceData{ - Spans: make([]*tracepb.Span, 7), - Resource: &resourcepb.Resource{ - Type: "testtype", - }, - } + tfc := NewTracesFanOutConnector(processors) + td := testdata.GenerateTraceDataOneSpan() var wantSpansCount = 0 for i := 0; i < 2; i++ { - wantSpansCount += len(td.Spans) - err := tfc.ConsumeTraceData(context.Background(), td) + wantSpansCount += td.SpanCount() + err := tfc.ConsumeTraces(context.Background(), td) if err != nil { t.Errorf("Wanted nil got error") return @@ -57,60 +47,53 @@ func TestTraceProcessorMultiplexing(t *testing.T) { } for _, p := range processors { - m := p.(*mockTraceConsumerOld) - assert.Equal(t, wantSpansCount, m.TotalSpans) - assert.True(t, td.Resource == m.Traces[0].Resource) + m := p.(*exportertest.SinkTraceExporter) + assert.Equal(t, wantSpansCount, m.SpansCount()) + assert.EqualValues(t, td, m.AllTraces()[0]) } } func TestTraceProcessorWhenOneErrors(t *testing.T) { - processors := make([]consumer.TraceConsumerOld, 3) + processors := make([]consumer.TraceConsumer, 3) for i := range processors { - processors[i] = &mockTraceConsumerOld{} + processors[i] = new(exportertest.SinkTraceExporter) } // Make one processor return error - processors[1].(*mockTraceConsumerOld).MustFail = true + processors[1].(*exportertest.SinkTraceExporter).SetConsumeTraceError(errors.New("my error")) - tfc := NewTraceFanOutConnectorOld(processors) - td := consumerdata.TraceData{ - Spans: make([]*tracepb.Span, 5), - } + tfc := NewTracesFanOutConnector(processors) + td := testdata.GenerateTraceDataOneSpan() var wantSpansCount = 0 for i := 0; i < 2; i++ { - wantSpansCount += len(td.Spans) - err := tfc.ConsumeTraceData(context.Background(), td) + wantSpansCount += td.SpanCount() + err := tfc.ConsumeTraces(context.Background(), td) if err == nil { t.Errorf("Wanted error got nil") return } } - for _, p := range processors { - m := p.(*mockTraceConsumerOld) - if m.TotalSpans != wantSpansCount { - t.Errorf("Wanted %d spans for every processor but got %d", wantSpansCount, m.TotalSpans) - return - } - } + assert.Equal(t, 0, processors[1].(*exportertest.SinkTraceExporter).SpansCount()) + assert.Equal(t, wantSpansCount, processors[0].(*exportertest.SinkTraceExporter).SpansCount()) + assert.Equal(t, wantSpansCount, processors[2].(*exportertest.SinkTraceExporter).SpansCount()) } func TestMetricsProcessorMultiplexing(t *testing.T) { - processors := make([]consumer.MetricsConsumerOld, 3) + processors := make([]consumer.MetricsConsumer, 3) for i := range processors { - processors[i] = &mockMetricsConsumerOld{} + processors[i] = new(exportertest.SinkMetricsExporter) } - mfc := NewMetricsFanOutConnectorOld(processors) - md := consumerdata.MetricsData{ - Metrics: make([]*metricspb.Metric, 7), - } + mfc := NewMetricsFanOutConnector(processors) + md := testdata.GenerateMetricDataOneMetric() + pmd := pdatautil.MetricsFromInternalMetrics(md) var wantMetricsCount = 0 for i := 0; i < 2; i++ { - wantMetricsCount += len(md.Metrics) - err := mfc.ConsumeMetricsData(context.Background(), md) + wantMetricsCount += md.MetricCount() + err := mfc.ConsumeMetrics(context.Background(), pmd) if err != nil { t.Errorf("Wanted nil got error") return @@ -118,192 +101,36 @@ func TestMetricsProcessorMultiplexing(t *testing.T) { } for _, p := range processors { - m := p.(*mockMetricsConsumerOld) - assert.Equal(t, wantMetricsCount, m.TotalMetrics) - assert.True(t, md.Resource == m.Metrics[0].Resource) + m := p.(*exportertest.SinkMetricsExporter) + assert.Equal(t, wantMetricsCount, m.MetricsCount()) + assert.EqualValues(t, md, pdatautil.MetricsToInternalMetrics(m.AllMetrics()[0])) } } func TestMetricsProcessorWhenOneErrors(t *testing.T) { - processors := make([]consumer.MetricsConsumerOld, 3) + processors := make([]consumer.MetricsConsumer, 3) for i := range processors { - processors[i] = &mockMetricsConsumerOld{} + processors[i] = new(exportertest.SinkMetricsExporter) } // Make one processor return error - processors[1].(*mockMetricsConsumerOld).MustFail = true + processors[1].(*exportertest.SinkMetricsExporter).SetConsumeMetricsError(errors.New("My Error")) - mfc := NewMetricsFanOutConnectorOld(processors) - md := consumerdata.MetricsData{ - Metrics: make([]*metricspb.Metric, 5), - } + mfc := NewMetricsFanOutConnector(processors) + md := testdata.GenerateMetricDataOneMetric() + pmd := pdatautil.MetricsFromInternalMetrics(md) var wantMetricsCount = 0 for i := 0; i < 2; i++ { - wantMetricsCount += len(md.Metrics) - err := mfc.ConsumeMetricsData(context.Background(), md) + wantMetricsCount += md.MetricCount() + err := mfc.ConsumeMetrics(context.Background(), pmd) if err == nil { t.Errorf("Wanted error got nil") return } } - for _, p := range processors { - m := p.(*mockMetricsConsumerOld) - if m.TotalMetrics != wantMetricsCount { - t.Errorf("Wanted %d metrics for every processor but got %d", wantMetricsCount, m.TotalMetrics) - return - } - } -} - -func TestCreateTraceFanOutConnectorWithConvertion(t *testing.T) { - traceConsumerOld := &mockTraceConsumerOld{} - traceConsumer := &mockTraceConsumer{} - processors := []consumer.TraceConsumerBase{ - traceConsumerOld, - traceConsumer, - } - - resourceTypeName := "good-resource" - - td := pdata.NewTraces() - rss := td.ResourceSpans() - rss.Resize(1) - rs0 := rss.At(0) - res := rs0.Resource() - res.InitEmpty() - res.Attributes().InitFromMap(map[string]pdata.AttributeValue{ - conventions.OCAttributeResourceType: pdata.NewAttributeValueString(resourceTypeName), - }) - rs0.InstrumentationLibrarySpans().Resize(1) - rs0.InstrumentationLibrarySpans().At(0).Spans().Resize(3) - - tfc := CreateTraceFanOutConnector(processors).(consumer.TraceConsumer) - - var wantSpansCount = 0 - for i := 0; i < 2; i++ { - wantSpansCount += td.SpanCount() - err := tfc.ConsumeTraces(context.Background(), td) - assert.NoError(t, err) - } - - assert.Equal(t, wantSpansCount, traceConsumerOld.TotalSpans) - assert.Equal(t, resourceTypeName, traceConsumerOld.Traces[0].Resource.Type) - - assert.Equal(t, wantSpansCount, traceConsumer.TotalSpans) - assert.Equal(t, pdata.NewAttributeMap().InitFromMap(map[string]pdata.AttributeValue{ - conventions.OCAttributeResourceType: pdata.NewAttributeValueString(resourceTypeName), - }), traceConsumer.Traces[0].ResourceSpans().At(0).Resource().Attributes()) -} - -func TestCreateMetricsFanOutConnectorWithConvertion(t *testing.T) { - metricsConsumerOld := &mockMetricsConsumerOld{} - metricsConsumer := &mockMetricsConsumer{} - processors := []consumer.MetricsConsumerBase{ - metricsConsumerOld, - metricsConsumer, - } - - resourceTypeName := "good-resource" - - md := data.NewMetricData() - rms := md.ResourceMetrics() - rms.Resize(1) - rm0 := rms.At(0) - res := rm0.Resource() - res.InitEmpty() - res.Attributes().InitFromMap(map[string]pdata.AttributeValue{ - conventions.OCAttributeResourceType: pdata.NewAttributeValueString(resourceTypeName), - }) - rm0.InstrumentationLibraryMetrics().Resize(1) - rm0.InstrumentationLibraryMetrics().At(0).Metrics().Resize(4) - - mfc := CreateMetricsFanOutConnector(processors).(consumer.MetricsConsumer) - - var wantSpansCount = 0 - for i := 0; i < 2; i++ { - wantSpansCount += md.MetricCount() - err := mfc.ConsumeMetrics(context.Background(), pdatautil.MetricsFromInternalMetrics(md)) - assert.NoError(t, err) - } - - assert.Equal(t, wantSpansCount, metricsConsumerOld.TotalMetrics) - assert.Equal(t, resourceTypeName, metricsConsumerOld.Metrics[0].Resource.Type) - - assert.Equal(t, wantSpansCount, metricsConsumer.TotalMetrics) - assert.Equal(t, pdata.NewAttributeMap().InitFromMap(map[string]pdata.AttributeValue{ - conventions.OCAttributeResourceType: pdata.NewAttributeValueString(resourceTypeName), - }), pdatautil.MetricsToInternalMetrics(*metricsConsumer.Metrics[0]).ResourceMetrics().At(0).Resource().Attributes()) -} - -type mockTraceConsumerOld struct { - Traces []*consumerdata.TraceData - TotalSpans int - MustFail bool -} - -var _ consumer.TraceConsumerOld = &mockTraceConsumerOld{} - -func (p *mockTraceConsumerOld) ConsumeTraceData(_ context.Context, td consumerdata.TraceData) error { - p.Traces = append(p.Traces, &td) - p.TotalSpans += len(td.Spans) - if p.MustFail { - return fmt.Errorf("this processor must fail") - } - - return nil -} - -type mockTraceConsumer struct { - Traces []*pdata.Traces - TotalSpans int - MustFail bool -} - -var _ consumer.TraceConsumer = &mockTraceConsumer{} - -func (p *mockTraceConsumer) ConsumeTraces(_ context.Context, td pdata.Traces) error { - p.Traces = append(p.Traces, &td) - p.TotalSpans += td.SpanCount() - if p.MustFail { - return fmt.Errorf("this processor must fail") - } - return nil - -} - -type mockMetricsConsumerOld struct { - Metrics []*consumerdata.MetricsData - TotalMetrics int - MustFail bool -} - -var _ consumer.MetricsConsumerOld = &mockMetricsConsumerOld{} - -func (p *mockMetricsConsumerOld) ConsumeMetricsData(_ context.Context, md consumerdata.MetricsData) error { - p.Metrics = append(p.Metrics, &md) - p.TotalMetrics += len(md.Metrics) - if p.MustFail { - return fmt.Errorf("this processor must fail") - } - - return nil -} - -type mockMetricsConsumer struct { - Metrics []*pdata.Metrics - TotalMetrics int - MustFail bool -} - -var _ consumer.MetricsConsumer = &mockMetricsConsumer{} - -func (p *mockMetricsConsumer) ConsumeMetrics(_ context.Context, md pdata.Metrics) error { - p.Metrics = append(p.Metrics, &md) - p.TotalMetrics += pdatautil.MetricCount(md) - if p.MustFail { - return fmt.Errorf("this processor must fail") - } - return nil + assert.Equal(t, 0, processors[1].(*exportertest.SinkMetricsExporter).MetricsCount()) + assert.Equal(t, wantMetricsCount, processors[0].(*exportertest.SinkMetricsExporter).MetricsCount()) + assert.Equal(t, wantMetricsCount, processors[2].(*exportertest.SinkMetricsExporter).MetricsCount()) } diff --git a/service/builder/exporters_builder.go b/service/builder/exporters_builder.go index 267047c91dde..87f1e1ff53c3 100644 --- a/service/builder/exporters_builder.go +++ b/service/builder/exporters_builder.go @@ -320,9 +320,6 @@ func createTraceExporter( ) (component.TraceExporter, error) { creationParams := component.ExporterCreateParams{Logger: logger} ctx := context.Background() - - // If exporter is of the new type (can manipulate on internal data structure), - // use ExporterFactory.CreateTraceExporter. return factory.CreateTraceExporter(ctx, creationParams, cfg) } @@ -334,22 +331,15 @@ func createMetricsExporter( ) (component.MetricsExporter, error) { creationParams := component.ExporterCreateParams{Logger: logger} ctx := context.Background() - - // If exporter is of the new type (can manipulate on internal data structure), - // use ExporterFactory.CreateMetricsExporter. return factory.CreateMetricsExporter(ctx, creationParams, cfg) } // createLogsExporter creates a data exporter based on provided factory type. func createLogsExporter( - factoryBase component.ExporterFactory, + factory component.ExporterFactory, logger *zap.Logger, cfg configmodels.Exporter, ) (component.LogsExporter, error) { - factory, ok := factoryBase.(component.ExporterFactory) - if !ok { - return nil, fmt.Errorf("exporter %q does not support data type %q", factoryBase.Type(), configmodels.LogsDataType) - } creationParams := component.ExporterCreateParams{Logger: logger} ctx := context.Background() return factory.CreateLogsExporter(ctx, creationParams, cfg) diff --git a/service/builder/pipelines_builder.go b/service/builder/pipelines_builder.go index b15371b35e89..df171fd97afa 100644 --- a/service/builder/pipelines_builder.go +++ b/service/builder/pipelines_builder.go @@ -24,7 +24,6 @@ import ( "go.opentelemetry.io/collector/component/componenterror" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/consumer/converter" "go.opentelemetry.io/collector/processor" ) @@ -33,8 +32,8 @@ import ( // processor in the pipeline or the exporter if pipeline has no processors). type builtPipeline struct { logger *zap.Logger - firstTC consumer.TraceConsumerBase - firstMC consumer.MetricsConsumerBase + firstTC consumer.TraceConsumer + firstMC consumer.MetricsConsumer firstLC consumer.LogsConsumer // MutatesConsumedData is set to true if any processors in the pipeline @@ -122,8 +121,8 @@ func (pb *PipelinesBuilder) buildPipeline(pipelineCfg *configmodels.Pipeline, // BuildProcessors the pipeline backwards. // First create a consumer junction point that fans out the data to all exporters. - var tc consumer.TraceConsumerBase - var mc consumer.MetricsConsumerBase + var tc consumer.TraceConsumer + var mc consumer.MetricsConsumer var lc consumer.LogsConsumer switch pipelineCfg.InputType { @@ -224,7 +223,7 @@ func (pb *PipelinesBuilder) getBuiltExportersByNames(exporterNames []string) []* return result } -func (pb *PipelinesBuilder) buildFanoutExportersTraceConsumer(exporterNames []string) consumer.TraceConsumerBase { +func (pb *PipelinesBuilder) buildFanoutExportersTraceConsumer(exporterNames []string) consumer.TraceConsumer { builtExporters := pb.getBuiltExportersByNames(exporterNames) // Optimize for the case when there is only one exporter, no need to create junction point. @@ -232,16 +231,16 @@ func (pb *PipelinesBuilder) buildFanoutExportersTraceConsumer(exporterNames []st return builtExporters[0].te } - var exporters []consumer.TraceConsumerBase + var exporters []consumer.TraceConsumer for _, builtExp := range builtExporters { exporters = append(exporters, builtExp.te) } // Create a junction point that fans out to all exporters. - return processor.CreateTraceFanOutConnector(exporters) + return processor.NewTracesFanOutConnector(exporters) } -func (pb *PipelinesBuilder) buildFanoutExportersMetricsConsumer(exporterNames []string) consumer.MetricsConsumerBase { +func (pb *PipelinesBuilder) buildFanoutExportersMetricsConsumer(exporterNames []string) consumer.MetricsConsumer { builtExporters := pb.getBuiltExportersByNames(exporterNames) // Optimize for the case when there is only one exporter, no need to create junction point. @@ -249,13 +248,13 @@ func (pb *PipelinesBuilder) buildFanoutExportersMetricsConsumer(exporterNames [] return builtExporters[0].me } - var exporters []consumer.MetricsConsumerBase + var exporters []consumer.MetricsConsumer for _, builtExp := range builtExporters { exporters = append(exporters, builtExp.me) } // Create a junction point that fans out to all exporters. - return processor.CreateMetricsFanOutConnector(exporters) + return processor.NewMetricsFanOutConnector(exporters) } func (pb *PipelinesBuilder) buildFanoutExportersLogConsumer( @@ -274,7 +273,7 @@ func (pb *PipelinesBuilder) buildFanoutExportersLogConsumer( } // Create a junction point that fans out to all exporters. - return processor.NewLogFanOutConnector(exporters) + return processor.NewLogsFanOutConnector(exporters) } // createTraceProcessor creates trace processor based on type of the current processor @@ -283,21 +282,11 @@ func createTraceProcessor( factory component.ProcessorFactory, logger *zap.Logger, cfg configmodels.Processor, - nextConsumer consumer.TraceConsumerBase, + nextConsumer consumer.TraceConsumer, ) (component.TraceProcessor, error) { creationParams := component.ProcessorCreateParams{Logger: logger} ctx := context.Background() - - // If both processor and consumer are of the new type (can manipulate on internal data structure), - // use ProcessorFactory.CreateTraceProcessor. - if nextConsumer, ok := nextConsumer.(consumer.TraceConsumer); ok { - return factory.CreateTraceProcessor(ctx, creationParams, nextConsumer, cfg) - } - - // If processor is of the new type, but downstream consumer is of the old type, - // use internalToOCTraceConverter compatibility shim. - traceConverter := converter.NewInternalToOCTraceConverter(nextConsumer.(consumer.TraceConsumerOld)) - return factory.CreateTraceProcessor(ctx, creationParams, traceConverter, cfg) + return factory.CreateTraceProcessor(ctx, creationParams, nextConsumer, cfg) } // createMetricsProcessor creates metric processor based on type of the current processor @@ -306,35 +295,20 @@ func createMetricsProcessor( factory component.ProcessorFactory, logger *zap.Logger, cfg configmodels.Processor, - nextConsumer consumer.MetricsConsumerBase, + nextConsumer consumer.MetricsConsumer, ) (component.MetricsProcessor, error) { creationParams := component.ProcessorCreateParams{Logger: logger} ctx := context.Background() - - // If both processor and consumer are of the new type (can manipulate on internal data structure), - // use ProcessorFactory.CreateMetricsProcessor. - if nextConsumer, ok := nextConsumer.(consumer.MetricsConsumer); ok { - return factory.CreateMetricsProcessor(ctx, creationParams, nextConsumer, cfg) - } - - // If processor is of the new type, but downstream consumer is of the old type, - // use internalToOCMetricsConverter compatibility shim. - metricsConverter := converter.NewInternalToOCMetricsConverter(nextConsumer.(consumer.MetricsConsumerOld)) - return factory.CreateMetricsProcessor(ctx, creationParams, metricsConverter, cfg) + return factory.CreateMetricsProcessor(ctx, creationParams, nextConsumer, cfg) } // createLogsProcessor creates a log processor using given factory and next consumer. func createLogsProcessor( - factoryBase component.Factory, + factory component.ProcessorFactory, logger *zap.Logger, cfg configmodels.Processor, nextConsumer consumer.LogsConsumer, ) (component.LogsProcessor, error) { - factory, ok := factoryBase.(component.ProcessorFactory) - if !ok { - return nil, fmt.Errorf("processor %q does support data type %q", - cfg.Name(), configmodels.LogsDataType) - } creationParams := component.ProcessorCreateParams{Logger: logger} ctx := context.Background() return factory.CreateLogsProcessor(ctx, creationParams, cfg, nextConsumer) diff --git a/service/builder/receivers_builder.go b/service/builder/receivers_builder.go index e7ae476add8f..3e208c138706 100644 --- a/service/builder/receivers_builder.go +++ b/service/builder/receivers_builder.go @@ -26,7 +26,6 @@ import ( "go.opentelemetry.io/collector/config/configerror" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/consumer/converter" "go.opentelemetry.io/collector/processor" ) @@ -272,13 +271,13 @@ func (rb *ReceiversBuilder) buildReceiver(logger *zap.Logger, config configmodel return rcv, nil } -func buildFanoutTraceConsumer(pipelines []*builtPipeline) consumer.TraceConsumerBase { +func buildFanoutTraceConsumer(pipelines []*builtPipeline) consumer.TraceConsumer { // Optimize for the case when there is only one processor, no need to create junction point. if len(pipelines) == 1 { return pipelines[0].firstTC } - var pipelineConsumers []consumer.TraceConsumerBase + var pipelineConsumers []consumer.TraceConsumer anyPipelineMutatesData := false for _, pipeline := range pipelines { pipelineConsumers = append(pipelineConsumers, pipeline.firstTC) @@ -292,18 +291,18 @@ func buildFanoutTraceConsumer(pipelines []*builtPipeline) consumer.TraceConsumer // TODO: if there are more than 2 pipelines only clone data for pipelines that // declare the intent to mutate the data. Pipelines that do not mutate the data // can consume shared data. - return processor.CreateTraceCloningFanOutConnector(pipelineConsumers) + return processor.NewTracesCloningFanOutConnector(pipelineConsumers) } - return processor.CreateTraceFanOutConnector(pipelineConsumers) + return processor.NewTracesFanOutConnector(pipelineConsumers) } -func buildFanoutMetricConsumer(pipelines []*builtPipeline) consumer.MetricsConsumerBase { +func buildFanoutMetricConsumer(pipelines []*builtPipeline) consumer.MetricsConsumer { // Optimize for the case when there is only one processor, no need to create junction point. if len(pipelines) == 1 { return pipelines[0].firstMC } - var pipelineConsumers []consumer.MetricsConsumerBase + var pipelineConsumers []consumer.MetricsConsumer anyPipelineMutatesData := false for _, pipeline := range pipelines { pipelineConsumers = append(pipelineConsumers, pipeline.firstMC) @@ -317,9 +316,9 @@ func buildFanoutMetricConsumer(pipelines []*builtPipeline) consumer.MetricsConsu // TODO: if there are more than 2 pipelines only clone data for pipelines that // declare the intent to mutate the data. Pipelines that do not mutate the data // can consume shared data. - return processor.CreateMetricsCloningFanOutConnector(pipelineConsumers) + return processor.NewMetricsCloningFanOutConnector(pipelineConsumers) } - return processor.CreateMetricsFanOutConnector(pipelineConsumers) + return processor.NewMetricsFanOutConnector(pipelineConsumers) } func buildFanoutLogConsumer(pipelines []*builtPipeline) consumer.LogsConsumer { @@ -342,9 +341,9 @@ func buildFanoutLogConsumer(pipelines []*builtPipeline) consumer.LogsConsumer { // TODO: if there are more than 2 pipelines only clone data for pipelines that // declare the intent to mutate the data. Pipelines that do not mutate the data // can consume shared data. - return processor.NewLogCloningFanOutConnector(pipelineConsumers) + return processor.NewLogsCloningFanOutConnector(pipelineConsumers) } - return processor.NewLogFanOutConnector(pipelineConsumers) + return processor.NewLogsFanOutConnector(pipelineConsumers) } // createTraceReceiver is a helper function that creates trace receiver based on the current receiver type @@ -354,19 +353,10 @@ func createTraceReceiver( factory component.ReceiverFactory, logger *zap.Logger, cfg configmodels.Receiver, - nextConsumer consumer.TraceConsumerBase, + nextConsumer consumer.TraceConsumer, ) (component.TraceReceiver, error) { creationParams := component.ReceiverCreateParams{Logger: logger} - - // If consumer is of the new type (can manipulate on internal data structure), - // use ProcessorFactory.CreateTraceReceiver. - if nextConsumer, ok := nextConsumer.(consumer.TraceConsumer); ok { - return factory.CreateTraceReceiver(ctx, creationParams, cfg, nextConsumer) - } - - // If consumer is of the old type, use internalToOCTraceConverter compatibility shim. - traceConverter := converter.NewInternalToOCTraceConverter(nextConsumer.(consumer.TraceConsumerOld)) - return factory.CreateTraceReceiver(ctx, creationParams, cfg, traceConverter) + return factory.CreateTraceReceiver(ctx, creationParams, cfg, nextConsumer) } // createMetricsReceiver is a helper function that creates metric receiver based @@ -376,34 +366,20 @@ func createMetricsReceiver( factory component.ReceiverFactory, logger *zap.Logger, cfg configmodels.Receiver, - nextConsumer consumer.MetricsConsumerBase, + nextConsumer consumer.MetricsConsumer, ) (component.MetricsReceiver, error) { creationParams := component.ReceiverCreateParams{Logger: logger} - - // If consumer is of the new type (can manipulate on internal data structure), - // use ProcessorFactory.CreateTraceReceiver. - if nextConsumer, ok := nextConsumer.(consumer.MetricsConsumer); ok { - return factory.CreateMetricsReceiver(ctx, creationParams, cfg, nextConsumer) - } - - // If consumer is of the old type, use internalToOCTraceConverter compatibility shim. - metricsConverter := converter.NewInternalToOCMetricsConverter(nextConsumer.(consumer.MetricsConsumerOld)) - return factory.CreateMetricsReceiver(ctx, creationParams, cfg, metricsConverter) + return factory.CreateMetricsReceiver(ctx, creationParams, cfg, nextConsumer) } // createLogsReceiver creates a log receiver using given factory and next consumer. func createLogsReceiver( ctx context.Context, - factoryBase component.Factory, + factory component.ReceiverFactory, logger *zap.Logger, cfg configmodels.Receiver, nextConsumer consumer.LogsConsumer, ) (component.LogsReceiver, error) { - factory, ok := factoryBase.(component.ReceiverFactory) - if !ok { - return nil, fmt.Errorf("receiver %q does support data type %q", - cfg.Name(), configmodels.LogsDataType) - } creationParams := component.ReceiverCreateParams{Logger: logger} return factory.CreateLogsReceiver(ctx, creationParams, cfg, nextConsumer) }