From f64f3bd8a46ce3151090b87318a5438ab7575497 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu <bogdandrutu@gmail.com> Date: Tue, 29 Nov 2022 06:55:53 -0800 Subject: [PATCH] [chore] Move capabilities consumers in a separate package (#6632) This is to prepare to move "pipelines" as public, see https://github.com/open-telemetry/opentelemetry-collector/issues/5564 Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com> Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com> --- .../capabilities.go | 8 ++++---- .../capabilities_test.go | 14 +++++++------- service/internal/pipelines/pipelines.go | 7 ++++--- 3 files changed, 15 insertions(+), 14 deletions(-) rename service/internal/{pipelines => capabilityconsumer}/capabilities.go (77%) rename service/internal/{pipelines => capabilityconsumer}/capabilities_test.go (86%) diff --git a/service/internal/pipelines/capabilities.go b/service/internal/capabilityconsumer/capabilities.go similarity index 77% rename from service/internal/pipelines/capabilities.go rename to service/internal/capabilityconsumer/capabilities.go index 6414f239f56..089955d19f8 100644 --- a/service/internal/pipelines/capabilities.go +++ b/service/internal/capabilityconsumer/capabilities.go @@ -12,13 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -package pipelines // import "go.opentelemetry.io/collector/service/internal/pipelines" +package capabilityconsumer // import "go.opentelemetry.io/collector/service/internal/capabilityconsumer" import ( "go.opentelemetry.io/collector/consumer" ) -func wrapLogs(logs consumer.Logs, cap consumer.Capabilities) consumer.Logs { +func NewLogs(logs consumer.Logs, cap consumer.Capabilities) consumer.Logs { return capLogs{Logs: logs, cap: cap} } @@ -31,7 +31,7 @@ func (mts capLogs) Capabilities() consumer.Capabilities { return mts.cap } -func wrapMetrics(metrics consumer.Metrics, cap consumer.Capabilities) consumer.Metrics { +func NewMetrics(metrics consumer.Metrics, cap consumer.Capabilities) consumer.Metrics { return capMetrics{Metrics: metrics, cap: cap} } @@ -44,7 +44,7 @@ func (mts capMetrics) Capabilities() consumer.Capabilities { return mts.cap } -func wrapTraces(traces consumer.Traces, cap consumer.Capabilities) consumer.Traces { +func NewTraces(traces consumer.Traces, cap consumer.Capabilities) consumer.Traces { return capTraces{Traces: traces, cap: cap} } diff --git a/service/internal/pipelines/capabilities_test.go b/service/internal/capabilityconsumer/capabilities_test.go similarity index 86% rename from service/internal/pipelines/capabilities_test.go rename to service/internal/capabilityconsumer/capabilities_test.go index cf91d436621..2e9ba798aa8 100644 --- a/service/internal/pipelines/capabilities_test.go +++ b/service/internal/capabilityconsumer/capabilities_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package pipelines +package capabilityconsumer import ( "context" @@ -26,11 +26,11 @@ import ( "go.opentelemetry.io/collector/internal/testdata" ) -func TestWrapLogs(t *testing.T) { +func TestLogs(t *testing.T) { sink := &consumertest.LogsSink{} require.Equal(t, consumer.Capabilities{MutatesData: false}, sink.Capabilities()) - wrap := wrapLogs(sink, consumer.Capabilities{MutatesData: true}) + wrap := NewLogs(sink, consumer.Capabilities{MutatesData: true}) assert.Equal(t, consumer.Capabilities{MutatesData: true}, wrap.Capabilities()) assert.NoError(t, wrap.ConsumeLogs(context.Background(), testdata.GenerateLogs(1))) @@ -38,11 +38,11 @@ func TestWrapLogs(t *testing.T) { assert.Equal(t, testdata.GenerateLogs(1), sink.AllLogs()[0]) } -func TestWrapMetrics(t *testing.T) { +func TestMetrics(t *testing.T) { sink := &consumertest.MetricsSink{} require.Equal(t, consumer.Capabilities{MutatesData: false}, sink.Capabilities()) - wrap := wrapMetrics(sink, consumer.Capabilities{MutatesData: true}) + wrap := NewMetrics(sink, consumer.Capabilities{MutatesData: true}) assert.Equal(t, consumer.Capabilities{MutatesData: true}, wrap.Capabilities()) assert.NoError(t, wrap.ConsumeMetrics(context.Background(), testdata.GenerateMetrics(1))) @@ -50,11 +50,11 @@ func TestWrapMetrics(t *testing.T) { assert.Equal(t, testdata.GenerateMetrics(1), sink.AllMetrics()[0]) } -func TestWrapTraces(t *testing.T) { +func TestTraces(t *testing.T) { sink := &consumertest.TracesSink{} require.Equal(t, consumer.Capabilities{MutatesData: false}, sink.Capabilities()) - wrap := wrapTraces(sink, consumer.Capabilities{MutatesData: true}) + wrap := NewTraces(sink, consumer.Capabilities{MutatesData: true}) assert.Equal(t, consumer.Capabilities{MutatesData: true}, wrap.Capabilities()) assert.NoError(t, wrap.ConsumeTraces(context.Background(), testdata.GenerateTraces(1))) diff --git a/service/internal/pipelines/pipelines.go b/service/internal/pipelines/pipelines.go index 9b4e2579021..f67c3faf17b 100644 --- a/service/internal/pipelines/pipelines.go +++ b/service/internal/pipelines/pipelines.go @@ -26,6 +26,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/service/internal/capabilityconsumer" "go.opentelemetry.io/collector/service/internal/components" "go.opentelemetry.io/collector/service/internal/fanoutconsumer" "go.opentelemetry.io/collector/service/internal/zpages" @@ -282,11 +283,11 @@ func Build(ctx context.Context, set Settings) (*Pipelines, error) { // Because of this wrap the first consumer if any consumers in the pipeline mutate the data and the first says that it doesn't. switch pipelineID.Type() { case component.DataTypeTraces: - bp.lastConsumer = capTraces{Traces: bp.lastConsumer.(consumer.Traces), cap: consumer.Capabilities{MutatesData: mutatesConsumedData}} + bp.lastConsumer = capabilityconsumer.NewTraces(bp.lastConsumer.(consumer.Traces), consumer.Capabilities{MutatesData: mutatesConsumedData}) case component.DataTypeMetrics: - bp.lastConsumer = capMetrics{Metrics: bp.lastConsumer.(consumer.Metrics), cap: consumer.Capabilities{MutatesData: mutatesConsumedData}} + bp.lastConsumer = capabilityconsumer.NewMetrics(bp.lastConsumer.(consumer.Metrics), consumer.Capabilities{MutatesData: mutatesConsumedData}) case component.DataTypeLogs: - bp.lastConsumer = capLogs{Logs: bp.lastConsumer.(consumer.Logs), cap: consumer.Capabilities{MutatesData: mutatesConsumedData}} + bp.lastConsumer = capabilityconsumer.NewLogs(bp.lastConsumer.(consumer.Logs), consumer.Capabilities{MutatesData: mutatesConsumedData}) default: return nil, fmt.Errorf("create cap consumer in pipeline %q, data type %q is not supported", pipelineID, pipelineID.Type()) }