Skip to content

Commit

Permalink
feat(sumologicschemaprocessor): add e2e tests for nesting processor
Browse files Browse the repository at this point in the history
  • Loading branch information
aboguszewski-sumo committed Dec 22, 2022
1 parent 97629b1 commit 0c1b10f
Showing 1 changed file with 296 additions and 0 deletions.
296 changes: 296 additions & 0 deletions pkg/processor/sumologicschemaprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
Expand Down Expand Up @@ -663,6 +664,280 @@ func TestTranslateTelegrafMetrics(t *testing.T) {
}
}

func TestNestingAttributesForLogs(t *testing.T) {
testCases := []struct {
name string
createLogs func() plog.Logs
test func(*testing.T, plog.Logs)
}{
{
name: "sample nesting",
createLogs: func() plog.Logs {
attrs := mapToPcommonMap(map[string]pcommon.Value{
"kubernetes.container_name": pcommon.NewValueStr("xyz"),
"kubernetes.host.name": pcommon.NewValueStr("the host"),
"kubernetes.host.address": pcommon.NewValueStr("127.0.0.1"),
"kubernetes.namespace_name": pcommon.NewValueStr("sumologic"),
"another_attr": pcommon.NewValueStr("42"),
})

resourceAttrs := mapToPcommonMap(map[string]pcommon.Value{
"a": mapToPcommonValue(map[string]pcommon.Value{
"b": mapToPcommonValue(map[string]pcommon.Value{
"c": pcommon.NewValueStr("d"),
}),
}),
"a.b.c": pcommon.NewValueStr("d"),
"d.g.e": pcommon.NewValueStr("l"),
"b.g.c": pcommon.NewValueStr("bonus"),
})

logs := plog.NewLogs()
resourceAttrs.CopyTo(logs.ResourceLogs().AppendEmpty().Resource().Attributes())
attrs.CopyTo(logs.ResourceLogs().At(0).ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Attributes())

return logs
},
test: func(t *testing.T, l plog.Logs) {
expectedAttrs := mapToPcommonMap(map[string]pcommon.Value{
"kubernetes": mapToPcommonValue(map[string]pcommon.Value{
"container_name": pcommon.NewValueStr("xyz"),
"namespace_name": pcommon.NewValueStr("sumologic"),
"host": mapToPcommonValue(map[string]pcommon.Value{
"name": pcommon.NewValueStr("the host"),
"address": pcommon.NewValueStr("127.0.0.1"),
}),
}),
"another_attr": pcommon.NewValueStr("42"),
})

expectedResourceAttrs := mapToPcommonMap(map[string]pcommon.Value{
"a": mapToPcommonValue(map[string]pcommon.Value{
"b": mapToPcommonValue(map[string]pcommon.Value{
"c": pcommon.NewValueStr("d"),
}),
}),
"d": mapToPcommonValue(map[string]pcommon.Value{
"g": mapToPcommonValue(map[string]pcommon.Value{
"e": pcommon.NewValueStr("l"),
}),
}),
"b": mapToPcommonValue(map[string]pcommon.Value{
"g": mapToPcommonValue(map[string]pcommon.Value{
"c": pcommon.NewValueStr("bonus"),
}),
}),
})

require.Equal(t, 1, l.ResourceLogs().Len())
require.Equal(t, expectedResourceAttrs.AsRaw(), l.ResourceLogs().At(0).Resource().Attributes().AsRaw())

require.Equal(t, 1, l.ResourceLogs().At(0).ScopeLogs().Len())
require.Equal(t, 1, l.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().Len())
require.Equal(t, expectedAttrs.AsRaw(), l.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().AsRaw())
},
},
}

for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
processor, err := newSumologicSchemaProcessor(newProcessorCreateSettings(), newNestAttributesConfig(".", true))
require.NoError(t, err)

logs := testCase.createLogs()

resultLogs, err := processor.processLogs(context.Background(), logs)
require.NoError(t, err)

testCase.test(t, resultLogs)
})
}
}

func TestNestingAttributesForMetrics(t *testing.T) {
testCases := []struct {
name string
createMetrics func() pmetric.Metrics
test func(*testing.T, pmetric.Metrics)
}{
{
name: "sample nesting",
createMetrics: func() pmetric.Metrics {
attrs := mapToPcommonMap(map[string]pcommon.Value{
"kubernetes.container_name": pcommon.NewValueStr("xyz"),
"kubernetes.host.name": pcommon.NewValueStr("the host"),
"kubernetes.host.address": pcommon.NewValueStr("127.0.0.1"),
"kubernetes.namespace_name": pcommon.NewValueStr("sumologic"),
"another_attr": pcommon.NewValueStr("42"),
})

resourceAttrs := mapToPcommonMap(map[string]pcommon.Value{
"a": mapToPcommonValue(map[string]pcommon.Value{
"b": mapToPcommonValue(map[string]pcommon.Value{
"c": pcommon.NewValueStr("d"),
}),
}),
"a.b.c": pcommon.NewValueStr("d"),
"d.g.e": pcommon.NewValueStr("l"),
"b.g.c": pcommon.NewValueStr("bonus"),
})

metrics := pmetric.NewMetrics()
resourceAttrs.CopyTo(metrics.ResourceMetrics().AppendEmpty().Resource().Attributes())
metric := metrics.ResourceMetrics().At(0).ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()
attrs.CopyTo(metric.SetEmptySum().DataPoints().AppendEmpty().Attributes())

return metrics
},
test: func(t *testing.T, m pmetric.Metrics) {
expectedAttrs := mapToPcommonMap(map[string]pcommon.Value{
"kubernetes": mapToPcommonValue(map[string]pcommon.Value{
"container_name": pcommon.NewValueStr("xyz"),
"namespace_name": pcommon.NewValueStr("sumologic"),
"host": mapToPcommonValue(map[string]pcommon.Value{
"name": pcommon.NewValueStr("the host"),
"address": pcommon.NewValueStr("127.0.0.1"),
}),
}),
"another_attr": pcommon.NewValueStr("42"),
})

expectedResourceAttrs := mapToPcommonMap(map[string]pcommon.Value{
"a": mapToPcommonValue(map[string]pcommon.Value{
"b": mapToPcommonValue(map[string]pcommon.Value{
"c": pcommon.NewValueStr("d"),
}),
}),
"d": mapToPcommonValue(map[string]pcommon.Value{
"g": mapToPcommonValue(map[string]pcommon.Value{
"e": pcommon.NewValueStr("l"),
}),
}),
"b": mapToPcommonValue(map[string]pcommon.Value{
"g": mapToPcommonValue(map[string]pcommon.Value{
"c": pcommon.NewValueStr("bonus"),
}),
}),
})

require.Equal(t, 1, m.ResourceMetrics().Len())
require.Equal(t, expectedResourceAttrs.AsRaw(), m.ResourceMetrics().At(0).Resource().Attributes().AsRaw())

require.Equal(t, 1, m.ResourceMetrics().At(0).ScopeMetrics().Len())
require.Equal(t, 1, m.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().Len())
require.Equal(t, 1, m.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().Len())

sum := m.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum()
require.Equal(t, expectedAttrs.AsRaw(), sum.DataPoints().At(0).Attributes().AsRaw())
},
},
}

for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
processor, err := newSumologicSchemaProcessor(newProcessorCreateSettings(), newNestAttributesConfig(".", true))
require.NoError(t, err)

metrics := testCase.createMetrics()

resultMetrics, err := processor.processMetrics(context.Background(), metrics)
require.NoError(t, err)

testCase.test(t, pmetric.Metrics(resultMetrics))
})
}
}

func TestNestingAttributesForTraces(t *testing.T) {
testCases := []struct {
name string
createTraces func() ptrace.Traces
test func(*testing.T, ptrace.Traces)
}{
{
name: "sample nesting",
createTraces: func() ptrace.Traces {
attrs := mapToPcommonMap(map[string]pcommon.Value{
"kubernetes.container_name": pcommon.NewValueStr("xyz"),
"kubernetes.host.name": pcommon.NewValueStr("the host"),
"kubernetes.host.address": pcommon.NewValueStr("127.0.0.1"),
"kubernetes.namespace_name": pcommon.NewValueStr("sumologic"),
"another_attr": pcommon.NewValueStr("42"),
})

resourceAttrs := mapToPcommonMap(map[string]pcommon.Value{
"a": mapToPcommonValue(map[string]pcommon.Value{
"b": mapToPcommonValue(map[string]pcommon.Value{
"c": pcommon.NewValueStr("d"),
}),
}),
"a.b.c": pcommon.NewValueStr("d"),
"d.g.e": pcommon.NewValueStr("l"),
"b.g.c": pcommon.NewValueStr("bonus"),
})

traces := ptrace.NewTraces()
resourceAttrs.CopyTo(traces.ResourceSpans().AppendEmpty().Resource().Attributes())
attrs.CopyTo(traces.ResourceSpans().At(0).ScopeSpans().AppendEmpty().Spans().AppendEmpty().Attributes())

return traces
},
test: func(t *testing.T, l ptrace.Traces) {
expectedAttrs := mapToPcommonMap(map[string]pcommon.Value{
"kubernetes": mapToPcommonValue(map[string]pcommon.Value{
"container_name": pcommon.NewValueStr("xyz"),
"namespace_name": pcommon.NewValueStr("sumologic"),
"host": mapToPcommonValue(map[string]pcommon.Value{
"name": pcommon.NewValueStr("the host"),
"address": pcommon.NewValueStr("127.0.0.1"),
}),
}),
"another_attr": pcommon.NewValueStr("42"),
})

expectedResourceAttrs := mapToPcommonMap(map[string]pcommon.Value{
"a": mapToPcommonValue(map[string]pcommon.Value{
"b": mapToPcommonValue(map[string]pcommon.Value{
"c": pcommon.NewValueStr("d"),
}),
}),
"d": mapToPcommonValue(map[string]pcommon.Value{
"g": mapToPcommonValue(map[string]pcommon.Value{
"e": pcommon.NewValueStr("l"),
}),
}),
"b": mapToPcommonValue(map[string]pcommon.Value{
"g": mapToPcommonValue(map[string]pcommon.Value{
"c": pcommon.NewValueStr("bonus"),
}),
}),
})

require.Equal(t, 1, l.ResourceSpans().Len())
require.Equal(t, expectedResourceAttrs.AsRaw(), l.ResourceSpans().At(0).Resource().Attributes().AsRaw())

require.Equal(t, 1, l.ResourceSpans().At(0).ScopeSpans().Len())
require.Equal(t, 1, l.ResourceSpans().At(0).ScopeSpans().At(0).Spans().Len())
require.Equal(t, expectedAttrs.AsRaw(), l.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().AsRaw())
},
},
}

for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
processor, err := newSumologicSchemaProcessor(newProcessorCreateSettings(), newNestAttributesConfig(".", true))
require.NoError(t, err)

traces := testCase.createTraces()

resultTraces, err := processor.processTraces(context.Background(), traces)
require.NoError(t, err)

testCase.test(t, resultTraces)
})
}
}

func newProcessorCreateSettings() component.ProcessorCreateSettings {
return component.ProcessorCreateSettings{
TelemetrySettings: component.TelemetrySettings{
Expand All @@ -676,6 +951,9 @@ func newCloudNamespaceConfig(addCloudNamespace bool) *Config {
config.AddCloudNamespace = addCloudNamespace
config.TranslateAttributes = false
config.TranslateTelegrafAttributes = false
config.NestAttributes = &NestingProcessorConfig{
Enabled: false,
}
return config
}

Expand All @@ -684,6 +962,9 @@ func newTranslateAttributesConfig(translateAttributes bool) *Config {
config.AddCloudNamespace = false
config.TranslateAttributes = translateAttributes
config.TranslateTelegrafAttributes = false
config.NestAttributes = &NestingProcessorConfig{
Enabled: false,
}
return config
}

Expand All @@ -692,5 +973,20 @@ func newTranslateTelegrafAttributesConfig(translateTelegrafAttributes bool) *Con
config.AddCloudNamespace = false
config.TranslateAttributes = false
config.TranslateTelegrafAttributes = translateTelegrafAttributes
config.NestAttributes = &NestingProcessorConfig{
Enabled: false,
}
return config
}

func newNestAttributesConfig(separator string, enabled bool) *Config {
config := createDefaultConfig().(*Config)
config.AddCloudNamespace = false
config.TranslateAttributes = false
config.TranslateTelegrafAttributes = false
config.NestAttributes = &NestingProcessorConfig{
Separator: separator,
Enabled: enabled,
}
return config
}

0 comments on commit 0c1b10f

Please sign in to comment.