diff --git a/connector/servicegraphconnector/connector_test.go b/connector/servicegraphconnector/connector_test.go index 8cdecfa761de..1c0ecac51e7f 100644 --- a/connector/servicegraphconnector/connector_test.go +++ b/connector/servicegraphconnector/connector_test.go @@ -571,8 +571,8 @@ func TestValidateOwnTelemetry(t *testing.T) { } mockMetricsExporter := newMockMetricsExporter() - set := metadatatest.SetupTelemetry() - p, err := newConnector(set.NewTelemetrySettings(), cfg, mockMetricsExporter) + tel := componenttest.NewTelemetry() + p, err := newConnector(tel.NewTelemetrySettings(), cfg, mockMetricsExporter) require.NoError(t, err) assert.NoError(t, p.Start(context.Background(), componenttest.NewNopHost())) @@ -594,18 +594,11 @@ func TestValidateOwnTelemetry(t *testing.T) { // Shutdown the connector assert.NoError(t, p.Shutdown(context.Background())) - set.AssertMetrics(t, []metricdata.Metrics{ - { - Name: "otelcol_connector_servicegraph_total_edges", - Description: "Total number of unique edges", - Unit: "1", - Data: metricdata.Sum[int64]{ - Temporality: metricdata.CumulativeTemporality, - IsMonotonic: true, - DataPoints: []metricdata.DataPoint[int64]{ - {Value: 2}, - }, - }, + metadatatest.AssertEqualConnectorServicegraphTotalEdges(t, tel, metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + {Value: 2}, }, }, metricdatatest.IgnoreTimestamp()) require.NoError(t, set.Shutdown(context.Background())) diff --git a/exporter/loadbalancingexporter/telemetry_utils_test.go b/exporter/loadbalancingexporter/telemetry_utils_test.go index c90079d7e0a1..d44302e2b327 100644 --- a/exporter/loadbalancingexporter/telemetry_utils_test.go +++ b/exporter/loadbalancingexporter/telemetry_utils_test.go @@ -6,16 +6,14 @@ package loadbalancingexporter // import "github.com/open-telemetry/opentelemetry import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exportertest" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter/internal/metadata" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter/internal/metadatatest" ) func getTelemetryAssets(t require.TestingT) (exporter.Settings, *metadata.TelemetryBuilder) { - s := metadatatest.SetupTelemetry() - st := s.NewSettings() - ts := st.TelemetrySettings - tb, err := metadata.NewTelemetryBuilder(ts) + s := exportertest.NewNopSettings() + tb, err := metadata.NewTelemetryBuilder(s.TelemetrySettings) require.NoError(t, err) - return st, tb + return s, tb } diff --git a/processor/filterprocessor/logs_test.go b/processor/filterprocessor/logs_test.go index d62d524aee7d..40f4c837d71b 100644 --- a/processor/filterprocessor/logs_test.go +++ b/processor/filterprocessor/logs_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" @@ -768,8 +769,9 @@ func TestFilterLogProcessorWithOTTL(t *testing.T) { } func TestFilterLogProcessorTelemetry(t *testing.T) { - tel := metadatatest.SetupTelemetry() - processor, err := newFilterLogsProcessor(tel.NewSettings(), &Config{ + tel := componenttest.NewTelemetry() + t.Cleanup(func() { require.NoError(t, tel.Shutdown(context.Background())) }) + processor, err := newFilterLogsProcessor(metadatatest.NewSettings(tel), &Config{ Logs: LogFilters{LogConditions: []string{`IsMatch(body, "operationA")`}}, }) assert.NoError(t, err) @@ -777,26 +779,12 @@ func TestFilterLogProcessorTelemetry(t *testing.T) { _, err = processor.processLogs(context.Background(), constructLogs()) assert.NoError(t, err) - want := []metricdata.Metrics{ + metadatatest.AssertEqualProcessorFilterDatapointsFiltered(t, tel, []metricdata.DataPoint[int64]{ { - Name: "otelcol_processor_filter_logs.filtered", - Description: "Number of logs dropped by the filter processor", - Unit: "1", - Data: metricdata.Sum[int64]{ - Temporality: metricdata.CumulativeTemporality, - IsMonotonic: true, - DataPoints: []metricdata.DataPoint[int64]{ - { - Value: 2, - Attributes: attribute.NewSet(attribute.String("filter", "filter")), - }, - }, - }, + Value: 2, + Attributes: attribute.NewSet(attribute.String("filter", "filter")), }, - } - - tel.AssertMetrics(t, want, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreTimestamp()) - require.NoError(t, tel.Shutdown(context.Background())) + }, metricdatatest.IgnoreTimestamp()) } func constructLogs() plog.Logs { diff --git a/processor/filterprocessor/metrics_test.go b/processor/filterprocessor/metrics_test.go index f913b009cf31..b55b8337d55e 100644 --- a/processor/filterprocessor/metrics_test.go +++ b/processor/filterprocessor/metrics_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" @@ -370,7 +371,7 @@ func TestFilterMetricProcessor(t *testing.T) { } func TestFilterMetricProcessorTelemetry(t *testing.T) { - tel := metadatatest.SetupTelemetry() + tel := componenttest.NewTelemetry() cfg := &Config{ Metrics: MetricFilters{ MetricConditions: []string{ @@ -378,10 +379,7 @@ func TestFilterMetricProcessorTelemetry(t *testing.T) { }, }, } - fmp, err := newFilterMetricProcessor( - tel.NewSettings(), - cfg, - ) + fmp, err := newFilterMetricProcessor(metadatatest.NewSettings(tel), cfg) assert.NotNil(t, fmp) assert.NoError(t, err) @@ -395,24 +393,12 @@ func TestFilterMetricProcessorTelemetry(t *testing.T) { })) assert.NoError(t, err) - want := []metricdata.Metrics{ + metadatatest.AssertEqualProcessorFilterDatapointsFiltered(t, tel, []metricdata.DataPoint[int64]{ { - Name: "otelcol_processor_filter_datapoints.filtered", - Description: "Number of metric data points dropped by the filter processor", - Unit: "1", - Data: metricdata.Sum[int64]{ - Temporality: metricdata.CumulativeTemporality, - IsMonotonic: true, - DataPoints: []metricdata.DataPoint[int64]{ - { - Value: 1, - Attributes: attribute.NewSet(attribute.String("filter", "filter")), - }, - }, - }, + Value: 1, + Attributes: attribute.NewSet(attribute.String("filter", "filter")), }, - } - tel.AssertMetrics(t, want, metricdatatest.IgnoreTimestamp()) + }, metricdatatest.IgnoreTimestamp()) require.NoError(t, tel.Shutdown(context.Background())) } diff --git a/processor/tailsamplingprocessor/processor_decisions_test.go b/processor/tailsamplingprocessor/processor_decisions_test.go index a57b8858a84b..260c724a38c1 100644 --- a/processor/tailsamplingprocessor/processor_decisions_test.go +++ b/processor/tailsamplingprocessor/processor_decisions_test.go @@ -11,11 +11,11 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/processor/processortest" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/cache" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/metadatatest" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/sampling" ) @@ -25,7 +25,6 @@ func TestSamplingPolicyTypicalPath(t *testing.T) { NumTraces: defaultNumTraces, } nextConsumer := new(consumertest.TracesSink) - tel := metadatatest.SetupTelemetry() idb := newSyncIDBatcher() mpe1 := &mockPolicyEvaluator{} @@ -34,7 +33,7 @@ func TestSamplingPolicyTypicalPath(t *testing.T) { {name: "mock-policy-1", evaluator: mpe1, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-1"))}, } - p, err := newTracesProcessor(context.Background(), tel.NewSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies)) + p, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies)) require.NoError(t, err) require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost())) @@ -61,7 +60,6 @@ func TestSamplingPolicyTypicalPath(t *testing.T) { // The final decision SHOULD be Sampled. require.EqualValues(t, 1, nextConsumer.SpanCount()) - require.NoError(t, tel.Shutdown(context.Background())) } func TestSamplingPolicyInvertSampled(t *testing.T) { @@ -70,7 +68,6 @@ func TestSamplingPolicyInvertSampled(t *testing.T) { NumTraces: defaultNumTraces, } nextConsumer := new(consumertest.TracesSink) - tel := metadatatest.SetupTelemetry() idb := newSyncIDBatcher() mpe1 := &mockPolicyEvaluator{} @@ -79,7 +76,7 @@ func TestSamplingPolicyInvertSampled(t *testing.T) { {name: "mock-policy-1", evaluator: mpe1, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-1"))}, } - p, err := newTracesProcessor(context.Background(), tel.NewSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies)) + p, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies)) require.NoError(t, err) require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost())) @@ -106,7 +103,6 @@ func TestSamplingPolicyInvertSampled(t *testing.T) { // The final decision SHOULD be Sampled. require.EqualValues(t, 1, nextConsumer.SpanCount()) - require.NoError(t, tel.Shutdown(context.Background())) } func TestSamplingMultiplePolicies(t *testing.T) { @@ -115,7 +111,6 @@ func TestSamplingMultiplePolicies(t *testing.T) { NumTraces: defaultNumTraces, } nextConsumer := new(consumertest.TracesSink) - tel := metadatatest.SetupTelemetry() idb := newSyncIDBatcher() mpe1 := &mockPolicyEvaluator{} @@ -126,7 +121,7 @@ func TestSamplingMultiplePolicies(t *testing.T) { {name: "mock-policy-2", evaluator: mpe2, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-2"))}, } - p, err := newTracesProcessor(context.Background(), tel.NewSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies)) + p, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies)) require.NoError(t, err) require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost())) @@ -157,7 +152,6 @@ func TestSamplingMultiplePolicies(t *testing.T) { // The final decision SHOULD be Sampled. require.EqualValues(t, 1, nextConsumer.SpanCount()) - require.NoError(t, tel.Shutdown(context.Background())) } func TestSamplingPolicyDecisionNotSampled(t *testing.T) { @@ -166,7 +160,6 @@ func TestSamplingPolicyDecisionNotSampled(t *testing.T) { NumTraces: defaultNumTraces, } nextConsumer := new(consumertest.TracesSink) - tel := metadatatest.SetupTelemetry() idb := newSyncIDBatcher() mpe1 := &mockPolicyEvaluator{} @@ -175,7 +168,7 @@ func TestSamplingPolicyDecisionNotSampled(t *testing.T) { {name: "mock-policy-1", evaluator: mpe1, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-1"))}, } - p, err := newTracesProcessor(context.Background(), tel.NewSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies)) + p, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies)) require.NoError(t, err) require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost())) @@ -203,7 +196,6 @@ func TestSamplingPolicyDecisionNotSampled(t *testing.T) { // The final decision SHOULD be NotSampled. require.EqualValues(t, 0, nextConsumer.SpanCount()) - require.NoError(t, tel.Shutdown(context.Background())) } func TestSamplingPolicyDecisionInvertNotSampled(t *testing.T) { @@ -212,7 +204,6 @@ func TestSamplingPolicyDecisionInvertNotSampled(t *testing.T) { NumTraces: defaultNumTraces, } nextConsumer := new(consumertest.TracesSink) - tel := metadatatest.SetupTelemetry() idb := newSyncIDBatcher() mpe1 := &mockPolicyEvaluator{} @@ -223,7 +214,7 @@ func TestSamplingPolicyDecisionInvertNotSampled(t *testing.T) { {name: "mock-policy-2", evaluator: mpe2, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-2"))}, } - p, err := newTracesProcessor(context.Background(), tel.NewSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies)) + p, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies)) require.NoError(t, err) require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost())) @@ -254,7 +245,6 @@ func TestSamplingPolicyDecisionInvertNotSampled(t *testing.T) { // The final decision SHOULD be NotSampled. require.EqualValues(t, 0, nextConsumer.SpanCount()) - require.NoError(t, tel.Shutdown(context.Background())) } func TestLateArrivingSpansAssignedOriginalDecision(t *testing.T) { @@ -263,7 +253,6 @@ func TestLateArrivingSpansAssignedOriginalDecision(t *testing.T) { NumTraces: defaultNumTraces, } nextConsumer := new(consumertest.TracesSink) - tel := metadatatest.SetupTelemetry() idb := newSyncIDBatcher() mpe1 := &mockPolicyEvaluator{} @@ -274,7 +263,7 @@ func TestLateArrivingSpansAssignedOriginalDecision(t *testing.T) { {name: "mock-policy-2", evaluator: mpe2, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-2"))}, } - p, err := newTracesProcessor(context.Background(), tel.NewSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies)) + p, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies)) require.NoError(t, err) require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost())) @@ -324,7 +313,6 @@ func TestLateArrivingSpansAssignedOriginalDecision(t *testing.T) { require.EqualValues(t, 1, mpe1.EvaluationCount) require.EqualValues(t, 1, mpe2.EvaluationCount) require.EqualValues(t, 0, nextConsumer.SpanCount(), "original final decision not honored") - require.NoError(t, tel.Shutdown(context.Background())) } func TestLateArrivingSpanUsesDecisionCache(t *testing.T) { @@ -333,7 +321,6 @@ func TestLateArrivingSpanUsesDecisionCache(t *testing.T) { NumTraces: defaultNumTraces, } nextConsumer := new(consumertest.TracesSink) - tel := metadatatest.SetupTelemetry() idb := newSyncIDBatcher() mpe := &mockPolicyEvaluator{} @@ -344,7 +331,7 @@ func TestLateArrivingSpanUsesDecisionCache(t *testing.T) { // Use this instead of the default no-op cache c, err := cache.NewLRUDecisionCache[bool](200) require.NoError(t, err) - p, err := newTracesProcessor(context.Background(), tel.NewSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies), withSampledDecisionCache(c)) + p, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies), withSampledDecisionCache(c)) require.NoError(t, err) require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost())) @@ -397,7 +384,6 @@ func TestLateArrivingSpanUsesDecisionCache(t *testing.T) { require.NoError(t, p.ConsumeTraces(context.Background(), spanIndexToTraces(2))) require.EqualValues(t, 1, mpe.EvaluationCount) require.EqualValues(t, 2, nextConsumer.SpanCount(), "original final decision not honored") - require.NoError(t, tel.Shutdown(context.Background())) } func TestLateSpanUsesNonSampledDecisionCache(t *testing.T) { @@ -406,8 +392,6 @@ func TestLateSpanUsesNonSampledDecisionCache(t *testing.T) { NumTraces: defaultNumTraces, } nextConsumer := new(consumertest.TracesSink) - s := metadatatest.SetupTelemetry() - ct := s.NewSettings() idb := newSyncIDBatcher() mpe := &mockPolicyEvaluator{} @@ -418,7 +402,7 @@ func TestLateSpanUsesNonSampledDecisionCache(t *testing.T) { // Use this instead of the default no-op cache c, err := cache.NewLRUDecisionCache[bool](200) require.NoError(t, err) - p, err := newTracesProcessor(context.Background(), ct, nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies), withNonSampledDecisionCache(c)) + p, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies), withNonSampledDecisionCache(c)) require.NoError(t, err) require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost())) diff --git a/processor/tailsamplingprocessor/processor_test.go b/processor/tailsamplingprocessor/processor_test.go index 2a391a91dde0..e4d44c580c7f 100644 --- a/processor/tailsamplingprocessor/processor_test.go +++ b/processor/tailsamplingprocessor/processor_test.go @@ -25,7 +25,6 @@ import ( "go.uber.org/zap/zaptest/observer" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/idbatcher" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/metadatatest" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/sampling" ) @@ -124,8 +123,6 @@ func TestTraceIntegrity(t *testing.T) { NumTraces: defaultNumTraces, } nextConsumer := new(consumertest.TracesSink) - s := metadatatest.SetupTelemetry() - ct := s.NewSettings() idb := newSyncIDBatcher() mpe1 := &mockPolicyEvaluator{} @@ -134,7 +131,7 @@ func TestTraceIntegrity(t *testing.T) { {name: "mock-policy-1", evaluator: mpe1, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-1"))}, } - p, err := newTracesProcessor(context.Background(), ct, nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies)) + p, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies)) require.NoError(t, err) require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost())) @@ -391,12 +388,10 @@ func TestMultipleBatchesAreCombinedIntoOne(t *testing.T) { }, }, } - s := metadatatest.SetupTelemetry() - ct := s.NewSettings() idb := newSyncIDBatcher() msp := new(consumertest.TracesSink) - p, err := newTracesProcessor(context.Background(), ct, msp, cfg, withDecisionBatcher(idb)) + p, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), msp, cfg, withDecisionBatcher(idb)) require.NoError(t, err) require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost())) @@ -463,12 +458,10 @@ func TestSetSamplingPolicy(t *testing.T) { }, }, } - s := metadatatest.SetupTelemetry() - ct := s.NewSettings() idb := newSyncIDBatcher() msp := new(consumertest.TracesSink) - p, err := newTracesProcessor(context.Background(), ct, msp, cfg, withDecisionBatcher(idb)) + p, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), msp, cfg, withDecisionBatcher(idb)) require.NoError(t, err) require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost())) diff --git a/receiver/kafkareceiver/kafka_receiver_test.go b/receiver/kafkareceiver/kafka_receiver_test.go index 1f321162930d..3d6f7aa01285 100644 --- a/receiver/kafkareceiver/kafka_receiver_test.go +++ b/receiver/kafkareceiver/kafka_receiver_test.go @@ -152,7 +152,8 @@ func TestTracesReceiver_error(t *testing.T) { } func TestTracesConsumerGroupHandler(t *testing.T) { - tel := metadatatest.SetupTelemetry() + tel := componenttest.NewTelemetry() + t.Cleanup(func() { require.NoError(t, tel.Shutdown(context.Background())) }) telemetryBuilder, err := metadata.NewTelemetryBuilder(tel.NewTelemetrySettings()) require.NoError(t, err) @@ -191,11 +192,11 @@ func TestTracesConsumerGroupHandler(t *testing.T) { groupClaim.messageChan <- &sarama.ConsumerMessage{} close(groupClaim.messageChan) wg.Wait() - require.NoError(t, tel.Shutdown(context.Background())) } func TestTracesConsumerGroupHandler_session_done(t *testing.T) { - tel := metadatatest.SetupTelemetry() + tel := componenttest.NewTelemetry() + t.Cleanup(func() { require.NoError(t, tel.Shutdown(context.Background())) }) telemetryBuilder, err := metadata.NewTelemetryBuilder(tel.NewTelemetrySettings()) require.NoError(t, err) @@ -236,13 +237,13 @@ func TestTracesConsumerGroupHandler_session_done(t *testing.T) { groupClaim.messageChan <- &sarama.ConsumerMessage{} cancelFunc() wg.Wait() - require.NoError(t, tel.Shutdown(context.Background())) } func TestTracesConsumerGroupHandler_error_unmarshal(t *testing.T) { obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings()}) require.NoError(t, err) - tel := metadatatest.SetupTelemetry() + tel := componenttest.NewTelemetry() + t.Cleanup(func() { require.NoError(t, tel.Shutdown(context.Background())) }) telemetryBuilder, err := metadata.NewTelemetryBuilder(tel.NewTelemetrySettings()) require.NoError(t, err) c := tracesConsumerGroupHandler{ @@ -268,74 +269,39 @@ func TestTracesConsumerGroupHandler_error_unmarshal(t *testing.T) { groupClaim.messageChan <- &sarama.ConsumerMessage{Value: []byte("!@#")} close(groupClaim.messageChan) wg.Wait() - tel.AssertMetrics(t, []metricdata.Metrics{ + metadatatest.AssertEqualKafkaReceiverOffsetLag(t, tel, []metricdata.DataPoint[int64]{ { - Name: "otelcol_kafka_receiver_offset_lag", - Unit: "1", - Description: "Current offset lag", - Data: metricdata.Gauge[int64]{ - DataPoints: []metricdata.DataPoint[int64]{ - { - Value: 3, - Attributes: attribute.NewSet( - attribute.String("name", ""), - attribute.String("partition", "5"), - ), - }, - }, - }, + Value: 3, + Attributes: attribute.NewSet( + attribute.String("name", ""), + attribute.String("partition", "5"), + ), }, + }, metricdatatest.IgnoreTimestamp()) + metadatatest.AssertEqualKafkaReceiverCurrentOffset(t, tel, []metricdata.DataPoint[int64]{ { - Name: "otelcol_kafka_receiver_current_offset", - Unit: "1", - Description: "Current message offset", - Data: metricdata.Gauge[int64]{ - DataPoints: []metricdata.DataPoint[int64]{ - { - Value: 0, - Attributes: attribute.NewSet( - attribute.String("name", ""), - attribute.String("partition", "5"), - ), - }, - }, - }, + Value: 0, + Attributes: attribute.NewSet( + attribute.String("name", ""), + attribute.String("partition", "5"), + ), }, + }, metricdatatest.IgnoreTimestamp()) + metadatatest.AssertEqualKafkaReceiverMessages(t, tel, []metricdata.DataPoint[int64]{ { - Name: "otelcol_kafka_receiver_messages", - Unit: "1", - Description: "Number of received messages", - Data: metricdata.Sum[int64]{ - Temporality: metricdata.CumulativeTemporality, - IsMonotonic: true, - DataPoints: []metricdata.DataPoint[int64]{ - { - Value: 1, - Attributes: attribute.NewSet( - attribute.String("name", ""), - attribute.String("partition", "5"), - ), - }, - }, - }, + Value: 1, + Attributes: attribute.NewSet( + attribute.String("name", ""), + attribute.String("partition", "5"), + ), }, + }, metricdatatest.IgnoreTimestamp()) + metadatatest.AssertEqualKafkaReceiverUnmarshalFailedSpans(t, tel, []metricdata.DataPoint[int64]{ { - Name: "otelcol_kafka_receiver_unmarshal_failed_spans", - Unit: "1", - Description: "Number of spans failed to be unmarshaled", - Data: metricdata.Sum[int64]{ - Temporality: metricdata.CumulativeTemporality, - IsMonotonic: true, - DataPoints: []metricdata.DataPoint[int64]{ - { - Value: 1, - Attributes: attribute.NewSet(attribute.String("name", "")), - }, - }, - }, + Value: 1, + Attributes: attribute.NewSet(attribute.String("name", "")), }, }, metricdatatest.IgnoreTimestamp()) - require.NoError(t, tel.Shutdown(context.Background())) } func TestTracesConsumerGroupHandler_error_nextConsumer(t *testing.T) { @@ -497,7 +463,8 @@ func TestMetricsReceiver_error(t *testing.T) { } func TestMetricsConsumerGroupHandler(t *testing.T) { - tel := metadatatest.SetupTelemetry() + tel := componenttest.NewTelemetry() + t.Cleanup(func() { require.NoError(t, tel.Shutdown(context.Background())) }) telemetryBuilder, err := metadata.NewTelemetryBuilder(tel.NewTelemetrySettings()) require.NoError(t, err) @@ -536,11 +503,11 @@ func TestMetricsConsumerGroupHandler(t *testing.T) { groupClaim.messageChan <- &sarama.ConsumerMessage{} close(groupClaim.messageChan) wg.Wait() - require.NoError(t, tel.Shutdown(context.Background())) } func TestMetricsConsumerGroupHandler_session_done(t *testing.T) { - tel := metadatatest.SetupTelemetry() + tel := componenttest.NewTelemetry() + t.Cleanup(func() { require.NoError(t, tel.Shutdown(context.Background())) }) telemetryBuilder, err := metadata.NewTelemetryBuilder(tel.NewTelemetrySettings()) require.NoError(t, err) @@ -580,13 +547,13 @@ func TestMetricsConsumerGroupHandler_session_done(t *testing.T) { groupClaim.messageChan <- &sarama.ConsumerMessage{} cancelFunc() wg.Wait() - require.NoError(t, tel.Shutdown(context.Background())) } func TestMetricsConsumerGroupHandler_error_unmarshal(t *testing.T) { obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings()}) require.NoError(t, err) - tel := metadatatest.SetupTelemetry() + tel := componenttest.NewTelemetry() + t.Cleanup(func() { require.NoError(t, tel.Shutdown(context.Background())) }) telemetryBuilder, err := metadata.NewTelemetryBuilder(tel.NewTelemetrySettings()) require.NoError(t, err) c := metricsConsumerGroupHandler{ @@ -612,74 +579,39 @@ func TestMetricsConsumerGroupHandler_error_unmarshal(t *testing.T) { groupClaim.messageChan <- &sarama.ConsumerMessage{Value: []byte("!@#")} close(groupClaim.messageChan) wg.Wait() - tel.AssertMetrics(t, []metricdata.Metrics{ + metadatatest.AssertEqualKafkaReceiverOffsetLag(t, tel, []metricdata.DataPoint[int64]{ { - Name: "otelcol_kafka_receiver_offset_lag", - Unit: "1", - Description: "Current offset lag", - Data: metricdata.Gauge[int64]{ - DataPoints: []metricdata.DataPoint[int64]{ - { - Value: 3, - Attributes: attribute.NewSet( - attribute.String("name", ""), - attribute.String("partition", "5"), - ), - }, - }, - }, + Value: 3, + Attributes: attribute.NewSet( + attribute.String("name", ""), + attribute.String("partition", "5"), + ), }, + }, metricdatatest.IgnoreTimestamp()) + metadatatest.AssertEqualKafkaReceiverCurrentOffset(t, tel, []metricdata.DataPoint[int64]{ { - Name: "otelcol_kafka_receiver_current_offset", - Unit: "1", - Description: "Current message offset", - Data: metricdata.Gauge[int64]{ - DataPoints: []metricdata.DataPoint[int64]{ - { - Value: 0, - Attributes: attribute.NewSet( - attribute.String("name", ""), - attribute.String("partition", "5"), - ), - }, - }, - }, + Value: 0, + Attributes: attribute.NewSet( + attribute.String("name", ""), + attribute.String("partition", "5"), + ), }, + }, metricdatatest.IgnoreTimestamp()) + metadatatest.AssertEqualKafkaReceiverMessages(t, tel, []metricdata.DataPoint[int64]{ { - Name: "otelcol_kafka_receiver_messages", - Unit: "1", - Description: "Number of received messages", - Data: metricdata.Sum[int64]{ - Temporality: metricdata.CumulativeTemporality, - IsMonotonic: true, - DataPoints: []metricdata.DataPoint[int64]{ - { - Value: 1, - Attributes: attribute.NewSet( - attribute.String("name", ""), - attribute.String("partition", "5"), - ), - }, - }, - }, + Value: 1, + Attributes: attribute.NewSet( + attribute.String("name", ""), + attribute.String("partition", "5"), + ), }, + }, metricdatatest.IgnoreTimestamp()) + metadatatest.AssertEqualKafkaReceiverUnmarshalFailedMetricPoints(t, tel, []metricdata.DataPoint[int64]{ { - Name: "otelcol_kafka_receiver_unmarshal_failed_metric_points", - Unit: "1", - Description: "Number of metric points failed to be unmarshaled", - Data: metricdata.Sum[int64]{ - Temporality: metricdata.CumulativeTemporality, - IsMonotonic: true, - DataPoints: []metricdata.DataPoint[int64]{ - { - Value: 1, - Attributes: attribute.NewSet(attribute.String("name", "")), - }, - }, - }, + Value: 1, + Attributes: attribute.NewSet(attribute.String("name", "")), }, }, metricdatatest.IgnoreTimestamp()) - require.NoError(t, tel.Shutdown(context.Background())) } func TestMetricsConsumerGroupHandler_error_nextConsumer(t *testing.T) { @@ -856,7 +788,8 @@ func TestLogsReceiver_error(t *testing.T) { } func TestLogsConsumerGroupHandler(t *testing.T) { - tel := metadatatest.SetupTelemetry() + tel := componenttest.NewTelemetry() + t.Cleanup(func() { require.NoError(t, tel.Shutdown(context.Background())) }) telemetryBuilder, err := metadata.NewTelemetryBuilder(tel.NewTelemetrySettings()) require.NoError(t, err) @@ -895,11 +828,11 @@ func TestLogsConsumerGroupHandler(t *testing.T) { groupClaim.messageChan <- &sarama.ConsumerMessage{} close(groupClaim.messageChan) wg.Wait() - require.NoError(t, tel.Shutdown(context.Background())) } func TestLogsConsumerGroupHandler_session_done(t *testing.T) { - tel := metadatatest.SetupTelemetry() + tel := componenttest.NewTelemetry() + t.Cleanup(func() { require.NoError(t, tel.Shutdown(context.Background())) }) telemetryBuilder, err := metadata.NewTelemetryBuilder(tel.NewTelemetrySettings()) require.NoError(t, err) @@ -939,13 +872,13 @@ func TestLogsConsumerGroupHandler_session_done(t *testing.T) { groupClaim.messageChan <- &sarama.ConsumerMessage{} cancelFunc() wg.Wait() - require.NoError(t, tel.Shutdown(context.Background())) } func TestLogsConsumerGroupHandler_error_unmarshal(t *testing.T) { obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings()}) require.NoError(t, err) - tel := metadatatest.SetupTelemetry() + tel := componenttest.NewTelemetry() + t.Cleanup(func() { require.NoError(t, tel.Shutdown(context.Background())) }) telemetryBuilder, err := metadata.NewTelemetryBuilder(tel.NewTelemetrySettings()) require.NoError(t, err) c := logsConsumerGroupHandler{ @@ -971,74 +904,41 @@ func TestLogsConsumerGroupHandler_error_unmarshal(t *testing.T) { groupClaim.messageChan <- &sarama.ConsumerMessage{Value: []byte("!@#")} close(groupClaim.messageChan) wg.Wait() - tel.AssertMetrics(t, []metricdata.Metrics{ + metadatatest.AssertEqualKafkaReceiverOffsetLag(t, tel, []metricdata.DataPoint[int64]{ { - Name: "otelcol_kafka_receiver_offset_lag", - Unit: "1", - Description: "Current offset lag", - Data: metricdata.Gauge[int64]{ - DataPoints: []metricdata.DataPoint[int64]{ - { - Value: 3, - Attributes: attribute.NewSet( - attribute.String("name", ""), - attribute.String("partition", "5"), - ), - }, - }, - }, + Value: 3, + Attributes: attribute.NewSet( + attribute.String("name", ""), + attribute.String("partition", "5"), + ), }, + }, metricdatatest.IgnoreTimestamp()) + metadatatest.AssertEqualKafkaReceiverCurrentOffset(t, tel, []metricdata.DataPoint[int64]{ { - Name: "otelcol_kafka_receiver_current_offset", - Unit: "1", - Description: "Current message offset", - Data: metricdata.Gauge[int64]{ - DataPoints: []metricdata.DataPoint[int64]{ - { - Value: 0, - Attributes: attribute.NewSet( - attribute.String("name", ""), - attribute.String("partition", "5"), - ), - }, - }, - }, + Value: 0, + Attributes: attribute.NewSet( + attribute.String("name", ""), + attribute.String("partition", "5"), + ), }, + }, metricdatatest.IgnoreTimestamp()) + metadatatest.AssertEqualKafkaReceiverMessages(t, tel, []metricdata.DataPoint[int64]{ { - Name: "otelcol_kafka_receiver_messages", - Unit: "1", - Description: "Number of received messages", - Data: metricdata.Sum[int64]{ - Temporality: metricdata.CumulativeTemporality, - IsMonotonic: true, - DataPoints: []metricdata.DataPoint[int64]{ - { - Value: 1, - Attributes: attribute.NewSet( - attribute.String("name", ""), - attribute.String("partition", "5"), - ), - }, - }, - }, + Value: 1, + Attributes: attribute.NewSet( + attribute.String("name", ""), + attribute.String("partition", "5"), + ), }, + }, metricdatatest.IgnoreTimestamp()) + metadatatest.AssertEqualKafkaReceiverUnmarshalFailedLogRecords(t, tel, []metricdata.DataPoint[int64]{ { - Name: "otelcol_kafka_receiver_unmarshal_failed_log_records", - Unit: "1", - Description: "Number of log records failed to be unmarshaled", - Data: metricdata.Sum[int64]{ - Temporality: metricdata.CumulativeTemporality, - IsMonotonic: true, - DataPoints: []metricdata.DataPoint[int64]{ - { - Value: 1, - Attributes: attribute.NewSet(attribute.String("name", "")), - }, - }, - }, + Value: 1, + Attributes: attribute.NewSet( + attribute.String("name", ""), + ), }, }, metricdatatest.IgnoreTimestamp()) - require.NoError(t, tel.Shutdown(context.Background())) } func TestLogsConsumerGroupHandler_error_nextConsumer(t *testing.T) { @@ -1354,42 +1254,21 @@ func (t *testConsumerGroup) ResumeAll() { panic("implement me") } -func assertInternalTelemetry(t *testing.T, tel metadatatest.Telemetry, partitionClose int64) { - wantMetrics := []metricdata.Metrics{ +func assertInternalTelemetry(t *testing.T, tel *componenttest.Telemetry, partitionClose int64) { + metadatatest.AssertEqualKafkaReceiverPartitionStart(t, tel, []metricdata.DataPoint[int64]{ { - Name: "otelcol_kafka_receiver_partition_start", - Unit: "1", - Description: "Number of started partitions", - Data: metricdata.Sum[int64]{ - Temporality: metricdata.CumulativeTemporality, - IsMonotonic: true, - DataPoints: []metricdata.DataPoint[int64]{ - { - Value: 1, - Attributes: attribute.NewSet(attribute.String("name", "")), - }, - }, - }, + Value: 1, + Attributes: attribute.NewSet(attribute.String("name", "")), }, - } + }, metricdatatest.IgnoreTimestamp()) if partitionClose > 0 { - wantMetrics = append(wantMetrics, metricdata.Metrics{ - Name: "otelcol_kafka_receiver_partition_close", - Unit: "1", - Description: "Number of finished partitions", - Data: metricdata.Sum[int64]{ - Temporality: metricdata.CumulativeTemporality, - IsMonotonic: true, - DataPoints: []metricdata.DataPoint[int64]{ - { - Value: partitionClose, - Attributes: attribute.NewSet(attribute.String("name", "")), - }, - }, + metadatatest.AssertEqualKafkaReceiverPartitionClose(t, tel, []metricdata.DataPoint[int64]{ + { + Value: partitionClose, + Attributes: attribute.NewSet(attribute.String("name", "")), }, - }) + }, metricdatatest.IgnoreTimestamp()) } - tel.AssertMetrics(t, wantMetrics, metricdatatest.IgnoreTimestamp()) } func nopTelemetryBuilder(t *testing.T) *metadata.TelemetryBuilder { diff --git a/receiver/solacereceiver/unmarshaller_test.go b/receiver/solacereceiver/unmarshaller_test.go index 2c1141d86c11..de2153d1099a 100644 --- a/receiver/solacereceiver/unmarshaller_test.go +++ b/receiver/solacereceiver/unmarshaller_test.go @@ -10,6 +10,7 @@ import ( "github.com/Azure/go-amqp" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/otel/attribute" @@ -17,7 +18,6 @@ import ( "google.golang.org/protobuf/proto" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver/internal/metadata" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver/internal/metadatatest" egress_v1 "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver/internal/model/egress/v1" receive_v1 "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver/internal/model/receive/v1" ) @@ -321,10 +321,9 @@ func TestSolaceMessageUnmarshallerUnmarshal(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - tel := metadatatest.SetupTelemetry() - telemetryBuilder, err := metadata.NewTelemetryBuilder(tel.NewTelemetrySettings()) + telemetryBuilder, err := metadata.NewTelemetryBuilder(componenttest.NewNopTelemetrySettings()) require.NoError(t, err) - metricAttr := attribute.NewSet(attribute.String("receiver_name", tel.NewSettings().ID.Name())) + metricAttr := attribute.NewSet(attribute.String("receiver_name", metadata.Type.String())) u := newTracesUnmarshaller(zap.NewNop(), telemetryBuilder, metricAttr) traces, err := u.unmarshal(tt.message) if tt.err != nil {