diff --git a/pkg/processor/sumologicschemaprocessor/processor_test.go b/pkg/processor/sumologicschemaprocessor/processor_test.go index 51dbf75f3e..17bfe855b4 100644 --- a/pkg/processor/sumologicschemaprocessor/processor_test.go +++ b/pkg/processor/sumologicschemaprocessor/processor_test.go @@ -21,6 +21,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" @@ -663,6 +664,280 @@ func TestTranslateTelegrafMetrics(t *testing.T) { } } +func TestNestingAttributesForLogs(t *testing.T) { + testCases := []struct { + name string + createLogs func() plog.Logs + test func(*testing.T, plog.Logs) + }{ + { + name: "sample nesting", + createLogs: func() plog.Logs { + attrs := mapToPcommonMap(map[string]pcommon.Value{ + "kubernetes.container_name": pcommon.NewValueStr("xyz"), + "kubernetes.host.name": pcommon.NewValueStr("the host"), + "kubernetes.host.address": pcommon.NewValueStr("127.0.0.1"), + "kubernetes.namespace_name": pcommon.NewValueStr("sumologic"), + "another_attr": pcommon.NewValueStr("42"), + }) + + resourceAttrs := mapToPcommonMap(map[string]pcommon.Value{ + "a": mapToPcommonValue(map[string]pcommon.Value{ + "b": mapToPcommonValue(map[string]pcommon.Value{ + "c": pcommon.NewValueStr("d"), + }), + }), + "a.b.c": pcommon.NewValueStr("d"), + "d.g.e": pcommon.NewValueStr("l"), + "b.g.c": pcommon.NewValueStr("bonus"), + }) + + logs := plog.NewLogs() + resourceAttrs.CopyTo(logs.ResourceLogs().AppendEmpty().Resource().Attributes()) + attrs.CopyTo(logs.ResourceLogs().At(0).ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Attributes()) + + return logs + }, + test: func(t *testing.T, l plog.Logs) { + expectedAttrs := mapToPcommonMap(map[string]pcommon.Value{ + "kubernetes": mapToPcommonValue(map[string]pcommon.Value{ + "container_name": pcommon.NewValueStr("xyz"), + "namespace_name": pcommon.NewValueStr("sumologic"), + "host": mapToPcommonValue(map[string]pcommon.Value{ + "name": pcommon.NewValueStr("the host"), + "address": pcommon.NewValueStr("127.0.0.1"), + }), + }), + "another_attr": pcommon.NewValueStr("42"), + }) + + expectedResourceAttrs := mapToPcommonMap(map[string]pcommon.Value{ + "a": mapToPcommonValue(map[string]pcommon.Value{ + "b": mapToPcommonValue(map[string]pcommon.Value{ + "c": pcommon.NewValueStr("d"), + }), + }), + "d": mapToPcommonValue(map[string]pcommon.Value{ + "g": mapToPcommonValue(map[string]pcommon.Value{ + "e": pcommon.NewValueStr("l"), + }), + }), + "b": mapToPcommonValue(map[string]pcommon.Value{ + "g": mapToPcommonValue(map[string]pcommon.Value{ + "c": pcommon.NewValueStr("bonus"), + }), + }), + }) + + require.Equal(t, 1, l.ResourceLogs().Len()) + require.Equal(t, expectedResourceAttrs.AsRaw(), l.ResourceLogs().At(0).Resource().Attributes().AsRaw()) + + require.Equal(t, 1, l.ResourceLogs().At(0).ScopeLogs().Len()) + require.Equal(t, 1, l.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().Len()) + require.Equal(t, expectedAttrs.AsRaw(), l.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().AsRaw()) + }, + }, + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + processor, err := newSumologicSchemaProcessor(newProcessorCreateSettings(), newNestAttributesConfig(".", true)) + require.NoError(t, err) + + logs := testCase.createLogs() + + resultLogs, err := processor.processLogs(context.Background(), logs) + require.NoError(t, err) + + testCase.test(t, resultLogs) + }) + } +} + +func TestNestingAttributesForMetrics(t *testing.T) { + testCases := []struct { + name string + createMetrics func() pmetric.Metrics + test func(*testing.T, pmetric.Metrics) + }{ + { + name: "sample nesting", + createMetrics: func() pmetric.Metrics { + attrs := mapToPcommonMap(map[string]pcommon.Value{ + "kubernetes.container_name": pcommon.NewValueStr("xyz"), + "kubernetes.host.name": pcommon.NewValueStr("the host"), + "kubernetes.host.address": pcommon.NewValueStr("127.0.0.1"), + "kubernetes.namespace_name": pcommon.NewValueStr("sumologic"), + "another_attr": pcommon.NewValueStr("42"), + }) + + resourceAttrs := mapToPcommonMap(map[string]pcommon.Value{ + "a": mapToPcommonValue(map[string]pcommon.Value{ + "b": mapToPcommonValue(map[string]pcommon.Value{ + "c": pcommon.NewValueStr("d"), + }), + }), + "a.b.c": pcommon.NewValueStr("d"), + "d.g.e": pcommon.NewValueStr("l"), + "b.g.c": pcommon.NewValueStr("bonus"), + }) + + metrics := pmetric.NewMetrics() + resourceAttrs.CopyTo(metrics.ResourceMetrics().AppendEmpty().Resource().Attributes()) + metric := metrics.ResourceMetrics().At(0).ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() + attrs.CopyTo(metric.SetEmptySum().DataPoints().AppendEmpty().Attributes()) + + return metrics + }, + test: func(t *testing.T, m pmetric.Metrics) { + expectedAttrs := mapToPcommonMap(map[string]pcommon.Value{ + "kubernetes": mapToPcommonValue(map[string]pcommon.Value{ + "container_name": pcommon.NewValueStr("xyz"), + "namespace_name": pcommon.NewValueStr("sumologic"), + "host": mapToPcommonValue(map[string]pcommon.Value{ + "name": pcommon.NewValueStr("the host"), + "address": pcommon.NewValueStr("127.0.0.1"), + }), + }), + "another_attr": pcommon.NewValueStr("42"), + }) + + expectedResourceAttrs := mapToPcommonMap(map[string]pcommon.Value{ + "a": mapToPcommonValue(map[string]pcommon.Value{ + "b": mapToPcommonValue(map[string]pcommon.Value{ + "c": pcommon.NewValueStr("d"), + }), + }), + "d": mapToPcommonValue(map[string]pcommon.Value{ + "g": mapToPcommonValue(map[string]pcommon.Value{ + "e": pcommon.NewValueStr("l"), + }), + }), + "b": mapToPcommonValue(map[string]pcommon.Value{ + "g": mapToPcommonValue(map[string]pcommon.Value{ + "c": pcommon.NewValueStr("bonus"), + }), + }), + }) + + require.Equal(t, 1, m.ResourceMetrics().Len()) + require.Equal(t, expectedResourceAttrs.AsRaw(), m.ResourceMetrics().At(0).Resource().Attributes().AsRaw()) + + require.Equal(t, 1, m.ResourceMetrics().At(0).ScopeMetrics().Len()) + require.Equal(t, 1, m.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().Len()) + require.Equal(t, 1, m.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().Len()) + + sum := m.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum() + require.Equal(t, expectedAttrs.AsRaw(), sum.DataPoints().At(0).Attributes().AsRaw()) + }, + }, + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + processor, err := newSumologicSchemaProcessor(newProcessorCreateSettings(), newNestAttributesConfig(".", true)) + require.NoError(t, err) + + metrics := testCase.createMetrics() + + resultMetrics, err := processor.processMetrics(context.Background(), metrics) + require.NoError(t, err) + + testCase.test(t, pmetric.Metrics(resultMetrics)) + }) + } +} + +func TestNestingAttributesForTraces(t *testing.T) { + testCases := []struct { + name string + createTraces func() ptrace.Traces + test func(*testing.T, ptrace.Traces) + }{ + { + name: "sample nesting", + createTraces: func() ptrace.Traces { + attrs := mapToPcommonMap(map[string]pcommon.Value{ + "kubernetes.container_name": pcommon.NewValueStr("xyz"), + "kubernetes.host.name": pcommon.NewValueStr("the host"), + "kubernetes.host.address": pcommon.NewValueStr("127.0.0.1"), + "kubernetes.namespace_name": pcommon.NewValueStr("sumologic"), + "another_attr": pcommon.NewValueStr("42"), + }) + + resourceAttrs := mapToPcommonMap(map[string]pcommon.Value{ + "a": mapToPcommonValue(map[string]pcommon.Value{ + "b": mapToPcommonValue(map[string]pcommon.Value{ + "c": pcommon.NewValueStr("d"), + }), + }), + "a.b.c": pcommon.NewValueStr("d"), + "d.g.e": pcommon.NewValueStr("l"), + "b.g.c": pcommon.NewValueStr("bonus"), + }) + + traces := ptrace.NewTraces() + resourceAttrs.CopyTo(traces.ResourceSpans().AppendEmpty().Resource().Attributes()) + attrs.CopyTo(traces.ResourceSpans().At(0).ScopeSpans().AppendEmpty().Spans().AppendEmpty().Attributes()) + + return traces + }, + test: func(t *testing.T, l ptrace.Traces) { + expectedAttrs := mapToPcommonMap(map[string]pcommon.Value{ + "kubernetes": mapToPcommonValue(map[string]pcommon.Value{ + "container_name": pcommon.NewValueStr("xyz"), + "namespace_name": pcommon.NewValueStr("sumologic"), + "host": mapToPcommonValue(map[string]pcommon.Value{ + "name": pcommon.NewValueStr("the host"), + "address": pcommon.NewValueStr("127.0.0.1"), + }), + }), + "another_attr": pcommon.NewValueStr("42"), + }) + + expectedResourceAttrs := mapToPcommonMap(map[string]pcommon.Value{ + "a": mapToPcommonValue(map[string]pcommon.Value{ + "b": mapToPcommonValue(map[string]pcommon.Value{ + "c": pcommon.NewValueStr("d"), + }), + }), + "d": mapToPcommonValue(map[string]pcommon.Value{ + "g": mapToPcommonValue(map[string]pcommon.Value{ + "e": pcommon.NewValueStr("l"), + }), + }), + "b": mapToPcommonValue(map[string]pcommon.Value{ + "g": mapToPcommonValue(map[string]pcommon.Value{ + "c": pcommon.NewValueStr("bonus"), + }), + }), + }) + + require.Equal(t, 1, l.ResourceSpans().Len()) + require.Equal(t, expectedResourceAttrs.AsRaw(), l.ResourceSpans().At(0).Resource().Attributes().AsRaw()) + + require.Equal(t, 1, l.ResourceSpans().At(0).ScopeSpans().Len()) + require.Equal(t, 1, l.ResourceSpans().At(0).ScopeSpans().At(0).Spans().Len()) + require.Equal(t, expectedAttrs.AsRaw(), l.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().AsRaw()) + }, + }, + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + processor, err := newSumologicSchemaProcessor(newProcessorCreateSettings(), newNestAttributesConfig(".", true)) + require.NoError(t, err) + + traces := testCase.createTraces() + + resultTraces, err := processor.processTraces(context.Background(), traces) + require.NoError(t, err) + + testCase.test(t, resultTraces) + }) + } +} + func newProcessorCreateSettings() component.ProcessorCreateSettings { return component.ProcessorCreateSettings{ TelemetrySettings: component.TelemetrySettings{ @@ -676,6 +951,9 @@ func newCloudNamespaceConfig(addCloudNamespace bool) *Config { config.AddCloudNamespace = addCloudNamespace config.TranslateAttributes = false config.TranslateTelegrafAttributes = false + config.NestAttributes = &NestingProcessorConfig{ + Enabled: false, + } return config } @@ -684,6 +962,9 @@ func newTranslateAttributesConfig(translateAttributes bool) *Config { config.AddCloudNamespace = false config.TranslateAttributes = translateAttributes config.TranslateTelegrafAttributes = false + config.NestAttributes = &NestingProcessorConfig{ + Enabled: false, + } return config } @@ -692,5 +973,20 @@ func newTranslateTelegrafAttributesConfig(translateTelegrafAttributes bool) *Con config.AddCloudNamespace = false config.TranslateAttributes = false config.TranslateTelegrafAttributes = translateTelegrafAttributes + config.NestAttributes = &NestingProcessorConfig{ + Enabled: false, + } + return config +} + +func newNestAttributesConfig(separator string, enabled bool) *Config { + config := createDefaultConfig().(*Config) + config.AddCloudNamespace = false + config.TranslateAttributes = false + config.TranslateTelegrafAttributes = false + config.NestAttributes = &NestingProcessorConfig{ + Separator: separator, + Enabled: enabled, + } return config }