diff --git a/pkg/ottl/contexts/internal/ctxcache/cache.go b/pkg/ottl/contexts/internal/ctxcache/cache.go new file mode 100644 index 000000000000..9c1800b204db --- /dev/null +++ b/pkg/ottl/contexts/internal/ctxcache/cache.go @@ -0,0 +1,47 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package ctxcache // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxcache" + +import ( + "context" + + "go.opentelemetry.io/collector/pdata/pcommon" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxutil" +) + +func PathExpressionParser[K any](cacheGetter func(K) pcommon.Map) ottl.PathExpressionParser[K] { + return func(path ottl.Path[K]) (ottl.GetSetter[K], error) { + if path.Keys() == nil { + return accessCache(cacheGetter), nil + } + return accessCacheKey(cacheGetter, path.Keys()), nil + } +} + +func accessCache[K any](cacheGetter func(K) pcommon.Map) ottl.StandardGetSetter[K] { + return ottl.StandardGetSetter[K]{ + Getter: func(_ context.Context, tCtx K) (any, error) { + return cacheGetter(tCtx), nil + }, + Setter: func(_ context.Context, tCtx K, val any) error { + if m, ok := val.(pcommon.Map); ok { + m.CopyTo(cacheGetter(tCtx)) + } + return nil + }, + } +} + +func accessCacheKey[K any](cacheGetter func(K) pcommon.Map, key []ottl.Key[K]) ottl.StandardGetSetter[K] { + return ottl.StandardGetSetter[K]{ + Getter: func(ctx context.Context, tCtx K) (any, error) { + return ctxutil.GetMapValue(ctx, tCtx, cacheGetter(tCtx), key) + }, + Setter: func(ctx context.Context, tCtx K, val any) error { + return ctxutil.SetMapValue(ctx, tCtx, cacheGetter(tCtx), key, val) + }, + } +} diff --git a/pkg/ottl/contexts/internal/ctxcache/cache_test.go b/pkg/ottl/contexts/internal/ctxcache/cache_test.go new file mode 100644 index 000000000000..0e50c5488e31 --- /dev/null +++ b/pkg/ottl/contexts/internal/ctxcache/cache_test.go @@ -0,0 +1,183 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package ctxcache + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/pathtest" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/ottltest" +) + +func Test_PathExpressionParser(t *testing.T) { + cache := pcommon.NewMap() + cache.PutStr("key1", "value1") + cache.PutInt("key2", 42) + + ctx := newTestContext(cache) + + parser := PathExpressionParser(func(tCtx testContext) pcommon.Map { + return tCtx.getCache() + }) + + t.Run("access entire cache", func(t *testing.T) { + path := &pathtest.Path[testContext]{ + N: "cache", + } + + getter, err := parser(path) + require.NoError(t, err) + + val, err := getter.Get(context.Background(), ctx) + require.NoError(t, err) + require.Equal(t, cache, val) + + result, ok := val.(pcommon.Map) + require.True(t, ok) + + v1, ok := result.Get("key1") + assert.True(t, ok) + assert.Equal(t, "value1", v1.Str()) + + v2, ok := result.Get("key2") + assert.True(t, ok) + assert.Equal(t, int64(42), v2.Int()) + }) + + t.Run("access specific cache key", func(t *testing.T) { + path := &pathtest.Path[testContext]{ + N: "cache", + KeySlice: []ottl.Key[testContext]{ + &pathtest.Key[testContext]{ + S: ottltest.Strp("key1"), + }, + }, + } + + getter, err := parser(path) + require.NoError(t, err) + + val, err := getter.Get(context.Background(), ctx) + require.NoError(t, err) + assert.Equal(t, "value1", val) + }) + + t.Run("modify entire cache", func(t *testing.T) { + path := &pathtest.Path[testContext]{ + N: "cache", + } + + getter, err := parser(path) + require.NoError(t, err) + + newCache := pcommon.NewMap() + newCache.PutStr("new_key", "new_value") + + err = getter.Set(context.Background(), ctx, newCache) + require.NoError(t, err) + + val, ok := ctx.cache.Get("new_key") + assert.True(t, ok) + assert.Equal(t, "new_value", val.Str()) + require.NotEqual(t, cache, val) + }) + + t.Run("modify specific cache key", func(t *testing.T) { + path := &pathtest.Path[testContext]{ + N: "cache", + KeySlice: []ottl.Key[testContext]{ + &pathtest.Key[testContext]{ + S: ottltest.Strp("key1"), + }, + }, + } + + getter, err := parser(path) + require.NoError(t, err) + + err = getter.Set(context.Background(), ctx, "updated_value") + require.NoError(t, err) + + v, ok := ctx.cache.Get("key1") + assert.True(t, ok) + assert.Equal(t, "updated_value", v.Str()) + }) + + t.Run("add new cache key", func(t *testing.T) { + path := &pathtest.Path[testContext]{ + N: "cache", + KeySlice: []ottl.Key[testContext]{ + &pathtest.Key[testContext]{ + S: ottltest.Strp("key3"), + }, + }, + } + + getter, err := parser(path) + require.NoError(t, err) + + err = getter.Set(context.Background(), ctx, "value3") + require.NoError(t, err) + + v, ok := ctx.cache.Get("key3") + assert.True(t, ok) + assert.Equal(t, "value3", v.Str()) + }) + + t.Run("access nested key", func(t *testing.T) { + nestedMap := pcommon.NewMap() + nestedMap.PutStr("nested_key", "nested_value") + parentMap := ctx.cache.PutEmptyMap("parent") + nestedMap.CopyTo(parentMap) + + path := &pathtest.Path[testContext]{ + N: "cache", + KeySlice: []ottl.Key[testContext]{ + &pathtest.Key[testContext]{ + S: ottltest.Strp("parent"), + }, + &pathtest.Key[testContext]{ + S: ottltest.Strp("nested_key"), + }, + }, + } + + getter, err := parser(path) + require.NoError(t, err) + + val, err := getter.Get(context.Background(), ctx) + require.NoError(t, err) + assert.Equal(t, "nested_value", val) + + err = getter.Set(context.Background(), ctx, "updated_nested_value") + require.NoError(t, err) + + parentValue, ok1 := ctx.cache.Get("parent") + require.True(t, ok1) + parentMap = parentValue.Map() + nestedValue, ok2 := parentMap.Get("nested_key") + require.True(t, ok2) + assert.Equal(t, "updated_nested_value", nestedValue.Str()) + }) +} + +type testContext struct { + cache pcommon.Map +} + +func (tCtx testContext) getCache() pcommon.Map { + return tCtx.cache +} + +func newTestContext(cache pcommon.Map) testContext { + return testContext{ + cache: cache, + } +} diff --git a/pkg/ottl/contexts/ottldatapoint/datapoint.go b/pkg/ottl/contexts/ottldatapoint/datapoint.go index fa5126172914..9418b19d0b66 100644 --- a/pkg/ottl/contexts/ottldatapoint/datapoint.go +++ b/pkg/ottl/contexts/ottldatapoint/datapoint.go @@ -4,7 +4,6 @@ package ottldatapoint // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottldatapoint" import ( - "context" "errors" "fmt" @@ -14,6 +13,7 @@ import ( "go.uber.org/zap/zapcore" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxcache" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxdatapoint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxerror" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxmetric" @@ -114,10 +114,6 @@ func (tCtx TransformContext) GetMetrics() pmetric.MetricSlice { return tCtx.metrics } -func (tCtx TransformContext) getCache() pcommon.Map { - return tCtx.cache -} - func (tCtx TransformContext) GetScopeSchemaURLItem() ctxutil.SchemaURLItem { return tCtx.scopeMetrics } @@ -126,8 +122,20 @@ func (tCtx TransformContext) GetResourceSchemaURLItem() ctxutil.SchemaURLItem { return tCtx.resourceMetrics } +func getCache(tCtx TransformContext) pcommon.Map { + return tCtx.cache +} + +type pathExpressionParser struct { + telemetrySettings component.TelemetrySettings + cacheGetSetter ottl.PathExpressionParser[TransformContext] +} + func NewParser(functions map[string]ottl.Factory[TransformContext], telemetrySettings component.TelemetrySettings, options ...Option) (ottl.Parser[TransformContext], error) { - pep := pathExpressionParser{telemetrySettings} + pep := pathExpressionParser{ + telemetrySettings: telemetrySettings, + cacheGetSetter: ctxcache.PathExpressionParser(getCache), + } p, err := ottl.NewParser[TransformContext]( functions, pep.parsePath, @@ -201,10 +209,6 @@ func parseEnum(val *ottl.EnumSymbol) (*ottl.Enum, error) { return nil, fmt.Errorf("enum symbol not provided") } -type pathExpressionParser struct { - telemetrySettings component.TelemetrySettings -} - func (pep *pathExpressionParser) parsePath(path ottl.Path[TransformContext]) (ottl.GetSetter[TransformContext], error) { if path == nil { return nil, ctxerror.New("nil", "nil", ctxdatapoint.Name, ctxdatapoint.DocRef) @@ -222,10 +226,7 @@ func (pep *pathExpressionParser) parsePath(path ottl.Path[TransformContext]) (ot switch path.Name() { case "cache": - if path.Keys() == nil { - return accessCache(), nil - } - return accessCacheKey(path.Keys()), nil + return pep.cacheGetSetter(path) default: return ctxdatapoint.PathGetSetter(path) } @@ -247,28 +248,3 @@ func (pep *pathExpressionParser) parseHigherContextPath(context string, path ott return nil, ctxerror.New(context, fullPath, ctxdatapoint.Name, ctxdatapoint.DocRef) } } - -func accessCache() ottl.StandardGetSetter[TransformContext] { - return ottl.StandardGetSetter[TransformContext]{ - Getter: func(_ context.Context, tCtx TransformContext) (any, error) { - return tCtx.getCache(), nil - }, - Setter: func(_ context.Context, tCtx TransformContext, val any) error { - if m, ok := val.(pcommon.Map); ok { - m.CopyTo(tCtx.getCache()) - } - return nil - }, - } -} - -func accessCacheKey(key []ottl.Key[TransformContext]) ottl.StandardGetSetter[TransformContext] { - return ottl.StandardGetSetter[TransformContext]{ - Getter: func(ctx context.Context, tCtx TransformContext) (any, error) { - return ctxutil.GetMapValue[TransformContext](ctx, tCtx, tCtx.getCache(), key) - }, - Setter: func(ctx context.Context, tCtx TransformContext, val any) error { - return ctxutil.SetMapValue[TransformContext](ctx, tCtx, tCtx.getCache(), key, val) - }, - } -} diff --git a/pkg/ottl/contexts/ottldatapoint/datapoint_test.go b/pkg/ottl/contexts/ottldatapoint/datapoint_test.go index b1ebfb0e4b0f..6cf8ba60c46e 100644 --- a/pkg/ottl/contexts/ottldatapoint/datapoint_test.go +++ b/pkg/ottl/contexts/ottldatapoint/datapoint_test.go @@ -15,6 +15,7 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxcache" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxdatapoint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/pathtest" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/ottltest" @@ -72,13 +73,19 @@ func Test_newPathGetSetter_Cache(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - pep := pathExpressionParser{} + testCache := pcommon.NewMap() + cacheGetter := func(_ TransformContext) pcommon.Map { + return testCache + } + pep := pathExpressionParser{ + cacheGetSetter: ctxcache.PathExpressionParser(cacheGetter), + } accessor, err := pep.parsePath(tt.path) assert.NoError(t, err) numberDataPoint := createNumberDataPointTelemetry(tt.valueType) - ctx := NewTransformContext(numberDataPoint, pmetric.NewMetric(), pmetric.NewMetricSlice(), pcommon.NewInstrumentationScope(), pcommon.NewResource(), pmetric.NewScopeMetrics(), pmetric.NewResourceMetrics()) + ctx := NewTransformContext(numberDataPoint, pmetric.NewMetric(), pmetric.NewMetricSlice(), pcommon.NewInstrumentationScope(), pcommon.NewResource(), pmetric.NewScopeMetrics(), pmetric.NewResourceMetrics(), WithCache(&testCache)) got, err := accessor.Get(context.Background(), ctx) assert.NoError(t, err) @@ -90,7 +97,7 @@ func Test_newPathGetSetter_Cache(t *testing.T) { exCache := pcommon.NewMap() tt.modified(exCache) - assert.Equal(t, exCache, ctx.getCache()) + assert.Equal(t, exCache, testCache) }) } } @@ -99,7 +106,7 @@ func Test_newPathGetSetter_WithCache(t *testing.T) { cacheValue := pcommon.NewMap() cacheValue.PutStr("test", "pass") - ctx := NewTransformContext( + tCtx := NewTransformContext( pmetric.NewNumberDataPoint(), pmetric.NewMetric(), pmetric.NewMetricSlice(), @@ -110,7 +117,7 @@ func Test_newPathGetSetter_WithCache(t *testing.T) { WithCache(&cacheValue), ) - assert.Equal(t, cacheValue, ctx.getCache()) + assert.Equal(t, cacheValue, getCache(tCtx)) } func Test_newPathGetSetter_NumberDataPoint(t *testing.T) { @@ -518,13 +525,19 @@ func Test_newPathGetSetter_NumberDataPoint(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - pep := pathExpressionParser{} + testCache := pcommon.NewMap() + cacheGetter := func(_ TransformContext) pcommon.Map { + return testCache + } + pep := pathExpressionParser{ + cacheGetSetter: ctxcache.PathExpressionParser(cacheGetter), + } accessor, err := pep.parsePath(tt.path) assert.NoError(t, err) numberDataPoint := createNumberDataPointTelemetry(tt.valueType) - ctx := NewTransformContext(numberDataPoint, pmetric.NewMetric(), pmetric.NewMetricSlice(), pcommon.NewInstrumentationScope(), pcommon.NewResource(), pmetric.NewScopeMetrics(), pmetric.NewResourceMetrics()) + ctx := NewTransformContext(numberDataPoint, pmetric.NewMetric(), pmetric.NewMetricSlice(), pcommon.NewInstrumentationScope(), pcommon.NewResource(), pmetric.NewScopeMetrics(), pmetric.NewResourceMetrics(), WithCache(&testCache)) got, err := accessor.Get(context.Background(), ctx) assert.NoError(t, err) @@ -962,13 +975,19 @@ func Test_newPathGetSetter_HistogramDataPoint(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - pep := pathExpressionParser{} + testCache := pcommon.NewMap() + cacheGetter := func(_ TransformContext) pcommon.Map { + return testCache + } + pep := pathExpressionParser{ + cacheGetSetter: ctxcache.PathExpressionParser(cacheGetter), + } accessor, err := pep.parsePath(tt.path) assert.NoError(t, err) histogramDataPoint := createHistogramDataPointTelemetry() - ctx := NewTransformContext(histogramDataPoint, pmetric.NewMetric(), pmetric.NewMetricSlice(), pcommon.NewInstrumentationScope(), pcommon.NewResource(), pmetric.NewScopeMetrics(), pmetric.NewResourceMetrics()) + ctx := NewTransformContext(histogramDataPoint, pmetric.NewMetric(), pmetric.NewMetricSlice(), pcommon.NewInstrumentationScope(), pcommon.NewResource(), pmetric.NewScopeMetrics(), pmetric.NewResourceMetrics(), WithCache(&testCache)) got, err := accessor.Get(context.Background(), ctx) assert.NoError(t, err) @@ -1490,13 +1509,19 @@ func Test_newPathGetSetter_ExpoHistogramDataPoint(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - pep := pathExpressionParser{} + testCache := pcommon.NewMap() + cacheGetter := func(_ TransformContext) pcommon.Map { + return testCache + } + pep := pathExpressionParser{ + cacheGetSetter: ctxcache.PathExpressionParser(cacheGetter), + } accessor, err := pep.parsePath(tt.path) assert.NoError(t, err) expoHistogramDataPoint := createExpoHistogramDataPointTelemetry() - ctx := NewTransformContext(expoHistogramDataPoint, pmetric.NewMetric(), pmetric.NewMetricSlice(), pcommon.NewInstrumentationScope(), pcommon.NewResource(), pmetric.NewScopeMetrics(), pmetric.NewResourceMetrics()) + ctx := NewTransformContext(expoHistogramDataPoint, pmetric.NewMetric(), pmetric.NewMetricSlice(), pcommon.NewInstrumentationScope(), pcommon.NewResource(), pmetric.NewScopeMetrics(), pmetric.NewResourceMetrics(), WithCache(&testCache)) got, err := accessor.Get(context.Background(), ctx) assert.NoError(t, err) @@ -1919,13 +1944,19 @@ func Test_newPathGetSetter_SummaryDataPoint(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - pep := pathExpressionParser{} + testCache := pcommon.NewMap() + cacheGetter := func(_ TransformContext) pcommon.Map { + return testCache + } + pep := pathExpressionParser{ + cacheGetSetter: ctxcache.PathExpressionParser(cacheGetter), + } accessor, err := pep.parsePath(tt.path) assert.NoError(t, err) summaryDataPoint := createSummaryDataPointTelemetry() - ctx := NewTransformContext(summaryDataPoint, pmetric.NewMetric(), pmetric.NewMetricSlice(), pcommon.NewInstrumentationScope(), pcommon.NewResource(), pmetric.NewScopeMetrics(), pmetric.NewResourceMetrics()) + ctx := NewTransformContext(summaryDataPoint, pmetric.NewMetric(), pmetric.NewMetricSlice(), pcommon.NewInstrumentationScope(), pcommon.NewResource(), pmetric.NewScopeMetrics(), pmetric.NewResourceMetrics(), WithCache(&testCache)) got, err := accessor.Get(context.Background(), ctx) assert.NoError(t, err) diff --git a/pkg/ottl/contexts/ottllog/log.go b/pkg/ottl/contexts/ottllog/log.go index 35a75613ad23..a270663ad9dc 100644 --- a/pkg/ottl/contexts/ottllog/log.go +++ b/pkg/ottl/contexts/ottllog/log.go @@ -4,7 +4,6 @@ package ottllog // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog" import ( - "context" "encoding/hex" "errors" "fmt" @@ -15,6 +14,7 @@ import ( "go.uber.org/zap/zapcore" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxcache" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxerror" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxlog" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxresource" @@ -108,10 +108,6 @@ func (tCtx TransformContext) GetResource() pcommon.Resource { return tCtx.resource } -func (tCtx TransformContext) getCache() pcommon.Map { - return tCtx.cache -} - func (tCtx TransformContext) GetScopeSchemaURLItem() ctxutil.SchemaURLItem { return tCtx.scopeLogs } @@ -120,8 +116,20 @@ func (tCtx TransformContext) GetResourceSchemaURLItem() ctxutil.SchemaURLItem { return tCtx.resourceLogs } +func getCache(tCtx TransformContext) pcommon.Map { + return tCtx.cache +} + +type pathExpressionParser struct { + telemetrySettings component.TelemetrySettings + cacheGetSetter ottl.PathExpressionParser[TransformContext] +} + func NewParser(functions map[string]ottl.Factory[TransformContext], telemetrySettings component.TelemetrySettings, options ...Option) (ottl.Parser[TransformContext], error) { - pep := pathExpressionParser{telemetrySettings} + pep := pathExpressionParser{ + telemetrySettings: telemetrySettings, + cacheGetSetter: ctxcache.PathExpressionParser(getCache), + } p, err := ottl.NewParser[TransformContext]( functions, pep.parsePath, @@ -194,10 +202,6 @@ func parseEnum(val *ottl.EnumSymbol) (*ottl.Enum, error) { return nil, fmt.Errorf("enum symbol not provided") } -type pathExpressionParser struct { - telemetrySettings component.TelemetrySettings -} - func (pep *pathExpressionParser) parsePath(path ottl.Path[TransformContext]) (ottl.GetSetter[TransformContext], error) { if path == nil { return nil, ctxerror.New("nil", "nil", ctxlog.Name, ctxlog.DocRef) @@ -213,10 +217,7 @@ func (pep *pathExpressionParser) parsePath(path ottl.Path[TransformContext]) (ot switch path.Name() { case "cache": - if path.Keys() == nil { - return accessCache(), nil - } - return accessCacheKey(path.Keys()), nil + return pep.cacheGetSetter(path) default: return ctxlog.PathGetSetter[TransformContext](path) } @@ -236,28 +237,3 @@ func (pep *pathExpressionParser) parseHigherContextPath(context string, path ott return nil, ctxerror.New(context, fullPath, ctxlog.Name, ctxlog.DocRef) } } - -func accessCache() ottl.StandardGetSetter[TransformContext] { - return ottl.StandardGetSetter[TransformContext]{ - Getter: func(_ context.Context, tCtx TransformContext) (any, error) { - return tCtx.getCache(), nil - }, - Setter: func(_ context.Context, tCtx TransformContext, val any) error { - if m, ok := val.(pcommon.Map); ok { - m.CopyTo(tCtx.getCache()) - } - return nil - }, - } -} - -func accessCacheKey(key []ottl.Key[TransformContext]) ottl.StandardGetSetter[TransformContext] { - return ottl.StandardGetSetter[TransformContext]{ - Getter: func(ctx context.Context, tCtx TransformContext) (any, error) { - return ctxutil.GetMapValue[TransformContext](ctx, tCtx, tCtx.getCache(), key) - }, - Setter: func(ctx context.Context, tCtx TransformContext, val any) error { - return ctxutil.SetMapValue[TransformContext](ctx, tCtx, tCtx.getCache(), key, val) - }, - } -} diff --git a/pkg/ottl/contexts/ottllog/log_test.go b/pkg/ottl/contexts/ottllog/log_test.go index 732e36e8675f..1b760b66c2c9 100644 --- a/pkg/ottl/contexts/ottllog/log_test.go +++ b/pkg/ottl/contexts/ottllog/log_test.go @@ -16,6 +16,7 @@ import ( "go.opentelemetry.io/collector/pdata/plog" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxcache" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxlog" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/pathtest" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/ottltest" @@ -610,18 +611,24 @@ func Test_newPathGetSetter(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - pep := pathExpressionParser{} + testCache := pcommon.NewMap() + cacheGetter := func(_ TransformContext) pcommon.Map { + return testCache + } + pep := pathExpressionParser{ + cacheGetSetter: ctxcache.PathExpressionParser(cacheGetter), + } accessor, err := pep.parsePath(tt.path) assert.NoError(t, err) log, il, resource := createTelemetry(tt.bodyType) - tCtx := NewTransformContext(log, il, resource, plog.NewScopeLogs(), plog.NewResourceLogs()) + tCtx := NewTransformContext(log, il, resource, plog.NewScopeLogs(), plog.NewResourceLogs(), WithCache(&testCache)) got, err := accessor.Get(context.Background(), tCtx) assert.NoError(t, err) assert.Equal(t, tt.orig, got) - tCtx = NewTransformContext(log, il, resource, plog.NewScopeLogs(), plog.NewResourceLogs()) + tCtx = NewTransformContext(log, il, resource, plog.NewScopeLogs(), plog.NewResourceLogs(), WithCache(&testCache)) err = accessor.Set(context.Background(), tCtx, tt.newVal) assert.NoError(t, err) @@ -632,7 +639,7 @@ func Test_newPathGetSetter(t *testing.T) { assert.Equal(t, exLog, log) assert.Equal(t, exIl, il) assert.Equal(t, exRes, resource) - assert.Equal(t, exCache, tCtx.getCache()) + assert.Equal(t, exCache, testCache) }) } } @@ -696,7 +703,7 @@ func Test_newPathGetSetter_WithCache(t *testing.T) { cacheValue := pcommon.NewMap() cacheValue.PutStr("test", "pass") - ctx := NewTransformContext( + tCtx := NewTransformContext( plog.NewLogRecord(), pcommon.NewInstrumentationScope(), pcommon.NewResource(), @@ -705,7 +712,7 @@ func Test_newPathGetSetter_WithCache(t *testing.T) { WithCache(&cacheValue), ) - assert.Equal(t, cacheValue, ctx.getCache()) + assert.Equal(t, cacheValue, getCache(tCtx)) } func createTelemetry(bodyType string) (plog.LogRecord, pcommon.InstrumentationScope, pcommon.Resource) { diff --git a/pkg/ottl/contexts/ottlmetric/metrics.go b/pkg/ottl/contexts/ottlmetric/metrics.go index ac2cf56427a3..b47a8d1b7520 100644 --- a/pkg/ottl/contexts/ottlmetric/metrics.go +++ b/pkg/ottl/contexts/ottlmetric/metrics.go @@ -4,7 +4,6 @@ package ottlmetric // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric" import ( - "context" "fmt" "go.opentelemetry.io/collector/component" @@ -12,6 +11,7 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxcache" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxerror" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxmetric" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxresource" @@ -83,10 +83,6 @@ func (tCtx TransformContext) GetResource() pcommon.Resource { return tCtx.resource } -func (tCtx TransformContext) getCache() pcommon.Map { - return tCtx.cache -} - func (tCtx TransformContext) GetScopeSchemaURLItem() ctxutil.SchemaURLItem { return tCtx.scopeMetrics } @@ -95,8 +91,20 @@ func (tCtx TransformContext) GetResourceSchemaURLItem() ctxutil.SchemaURLItem { return tCtx.resourceMetrics } +func getCache(tCtx TransformContext) pcommon.Map { + return tCtx.cache +} + +type pathExpressionParser struct { + telemetrySettings component.TelemetrySettings + cacheGetSetter ottl.PathExpressionParser[TransformContext] +} + func NewParser(functions map[string]ottl.Factory[TransformContext], telemetrySettings component.TelemetrySettings, options ...Option) (ottl.Parser[TransformContext], error) { - pep := pathExpressionParser{telemetrySettings} + pep := pathExpressionParser{ + telemetrySettings: telemetrySettings, + cacheGetSetter: ctxcache.PathExpressionParser(getCache), + } p, err := ottl.NewParser[TransformContext]( functions, pep.parsePath, @@ -169,10 +177,6 @@ func parseEnum(val *ottl.EnumSymbol) (*ottl.Enum, error) { return nil, fmt.Errorf("enum symbol not provided") } -type pathExpressionParser struct { - telemetrySettings component.TelemetrySettings -} - func (pep *pathExpressionParser) parsePath(path ottl.Path[TransformContext]) (ottl.GetSetter[TransformContext], error) { if path == nil { return nil, ctxerror.New("nil", "nil", ctxmetric.Name, ctxmetric.DocRef) @@ -188,10 +192,7 @@ func (pep *pathExpressionParser) parsePath(path ottl.Path[TransformContext]) (ot switch path.Name() { case "cache": - if path.Keys() == nil { - return accessCache(), nil - } - return accessCacheKey(path.Keys()), nil + return pep.cacheGetSetter(path) default: return ctxmetric.PathGetSetter[TransformContext](path) } @@ -211,28 +212,3 @@ func (pep *pathExpressionParser) parseHigherContextPath(context string, path ott return nil, ctxerror.New(context, fullPath, ctxmetric.Name, ctxmetric.DocRef) } } - -func accessCache() ottl.StandardGetSetter[TransformContext] { - return ottl.StandardGetSetter[TransformContext]{ - Getter: func(_ context.Context, tCtx TransformContext) (any, error) { - return tCtx.getCache(), nil - }, - Setter: func(_ context.Context, tCtx TransformContext, val any) error { - if m, ok := val.(pcommon.Map); ok { - m.CopyTo(tCtx.getCache()) - } - return nil - }, - } -} - -func accessCacheKey(key []ottl.Key[TransformContext]) ottl.StandardGetSetter[TransformContext] { - return ottl.StandardGetSetter[TransformContext]{ - Getter: func(ctx context.Context, tCtx TransformContext) (any, error) { - return ctxutil.GetMapValue[TransformContext](ctx, tCtx, tCtx.getCache(), key) - }, - Setter: func(ctx context.Context, tCtx TransformContext, val any) error { - return ctxutil.SetMapValue[TransformContext](ctx, tCtx, tCtx.getCache(), key, val) - }, - } -} diff --git a/pkg/ottl/contexts/ottlmetric/metrics_test.go b/pkg/ottl/contexts/ottlmetric/metrics_test.go index 7b878f1b2800..4c9a058a199e 100644 --- a/pkg/ottl/contexts/ottlmetric/metrics_test.go +++ b/pkg/ottl/contexts/ottlmetric/metrics_test.go @@ -10,10 +10,12 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxcache" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxmetric" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/pathtest" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/ottltest" @@ -155,7 +157,16 @@ func Test_newPathGetSetter(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - pep := pathExpressionParser{} + // Create a controlled test cache + testCache := pcommon.NewMap() + cacheGetter := func(_ TransformContext) pcommon.Map { + return testCache + } + pep := pathExpressionParser{ + telemetrySettings: component.TelemetrySettings{}, + cacheGetSetter: ctxcache.PathExpressionParser(cacheGetter), + } + accessor, err := pep.parsePath(tt.path) assert.NoError(t, err) @@ -175,7 +186,7 @@ func Test_newPathGetSetter(t *testing.T) { tt.modified(exMetric, exCache) assert.Equal(t, exMetric, metric) - assert.Equal(t, exCache, ctx.getCache()) + assert.Equal(t, exCache, testCache) }) } } @@ -244,7 +255,7 @@ func Test_newPathGetSetter_WithCache(t *testing.T) { cacheValue := pcommon.NewMap() cacheValue.PutStr("test", "pass") - ctx := NewTransformContext( + tCtx := NewTransformContext( pmetric.NewMetric(), pmetric.NewMetricSlice(), pcommon.NewInstrumentationScope(), @@ -254,7 +265,7 @@ func Test_newPathGetSetter_WithCache(t *testing.T) { WithCache(&cacheValue), ) - assert.Equal(t, cacheValue, ctx.getCache()) + assert.Equal(t, cacheValue, getCache(tCtx)) } func createTelemetry() pmetric.Metric { diff --git a/pkg/ottl/contexts/ottlresource/resource.go b/pkg/ottl/contexts/ottlresource/resource.go index b74578d128a8..6c93cb02e04e 100644 --- a/pkg/ottl/contexts/ottlresource/resource.go +++ b/pkg/ottl/contexts/ottlresource/resource.go @@ -4,7 +4,6 @@ package ottlresource // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlresource" import ( - "context" "errors" "fmt" @@ -13,6 +12,7 @@ import ( "go.uber.org/zap/zapcore" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxcache" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxerror" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxresource" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxutil" @@ -68,16 +68,24 @@ func (tCtx TransformContext) GetResource() pcommon.Resource { return tCtx.resource } -func (tCtx TransformContext) getCache() pcommon.Map { +func (tCtx TransformContext) GetResourceSchemaURLItem() ctxutil.SchemaURLItem { + return tCtx.schemaURLItem +} + +func getCache(tCtx TransformContext) pcommon.Map { return tCtx.cache } -func (tCtx TransformContext) GetResourceSchemaURLItem() ctxutil.SchemaURLItem { - return tCtx.schemaURLItem +type pathExpressionParser struct { + telemetrySettings component.TelemetrySettings + cacheGetSetter ottl.PathExpressionParser[TransformContext] } func NewParser(functions map[string]ottl.Factory[TransformContext], telemetrySettings component.TelemetrySettings, options ...Option) (ottl.Parser[TransformContext], error) { - pep := pathExpressionParser{telemetrySettings} + pep := pathExpressionParser{ + telemetrySettings: telemetrySettings, + cacheGetSetter: ctxcache.PathExpressionParser[TransformContext](getCache), + } p, err := ottl.NewParser[TransformContext]( functions, pep.parsePath, @@ -140,10 +148,6 @@ func parseEnum(_ *ottl.EnumSymbol) (*ottl.Enum, error) { return nil, fmt.Errorf("resource context does not provide Enum support") } -type pathExpressionParser struct { - telemetrySettings component.TelemetrySettings -} - func (pep *pathExpressionParser) parsePath(path ottl.Path[TransformContext]) (ottl.GetSetter[TransformContext], error) { if path == nil { return nil, fmt.Errorf("path cannot be nil") @@ -154,36 +158,8 @@ func (pep *pathExpressionParser) parsePath(path ottl.Path[TransformContext]) (ot switch path.Name() { case "cache": - if path.Keys() == nil { - return accessCache(), nil - } - return accessCacheKey(path.Keys()), nil + return pep.cacheGetSetter(path) default: return ctxresource.PathGetSetter[TransformContext](ContextName, path) } } - -func accessCache() ottl.StandardGetSetter[TransformContext] { - return ottl.StandardGetSetter[TransformContext]{ - Getter: func(_ context.Context, tCtx TransformContext) (any, error) { - return tCtx.getCache(), nil - }, - Setter: func(_ context.Context, tCtx TransformContext, val any) error { - if m, ok := val.(pcommon.Map); ok { - m.CopyTo(tCtx.getCache()) - } - return nil - }, - } -} - -func accessCacheKey(key []ottl.Key[TransformContext]) ottl.StandardGetSetter[TransformContext] { - return ottl.StandardGetSetter[TransformContext]{ - Getter: func(ctx context.Context, tCtx TransformContext) (any, error) { - return ctxutil.GetMapValue[TransformContext](ctx, tCtx, tCtx.getCache(), key) - }, - Setter: func(ctx context.Context, tCtx TransformContext, val any) error { - return ctxutil.SetMapValue[TransformContext](ctx, tCtx, tCtx.getCache(), key, val) - }, - } -} diff --git a/pkg/ottl/contexts/ottlresource/resource_test.go b/pkg/ottl/contexts/ottlresource/resource_test.go index 5878b8bd76d8..2a3e37044d51 100644 --- a/pkg/ottl/contexts/ottlresource/resource_test.go +++ b/pkg/ottl/contexts/ottlresource/resource_test.go @@ -13,6 +13,7 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxcache" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/pathtest" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/ottltest" ) @@ -374,13 +375,19 @@ func Test_newPathGetSetter(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - pep := pathExpressionParser{} + testCache := pcommon.NewMap() + cacheGetter := func(tCtx TransformContext) pcommon.Map { + return tCtx.cache + } + pep := pathExpressionParser{ + cacheGetSetter: ctxcache.PathExpressionParser(cacheGetter), + } accessor, err := pep.parsePath(tt.path) assert.NoError(t, err) resource := createTelemetry() - tCtx := NewTransformContext(resource, pmetric.NewResourceMetrics()) + tCtx := NewTransformContext(resource, pmetric.NewResourceMetrics(), WithCache(&testCache)) got, err := accessor.Get(context.Background(), tCtx) assert.NoError(t, err) assert.Equal(t, tt.orig, got) @@ -393,7 +400,7 @@ func Test_newPathGetSetter(t *testing.T) { tt.modified(exRes, exCache) assert.Equal(t, exRes, resource) - assert.Equal(t, exCache, tCtx.getCache()) + assert.Equal(t, exCache, testCache) }) } } @@ -402,13 +409,13 @@ func Test_newPathGetSetter_WithCache(t *testing.T) { cacheValue := pcommon.NewMap() cacheValue.PutStr("test", "pass") - ctx := NewTransformContext( + tCtx := NewTransformContext( pcommon.NewResource(), pmetric.NewResourceMetrics(), WithCache(&cacheValue), ) - assert.Equal(t, cacheValue, ctx.getCache()) + assert.Equal(t, cacheValue, getCache(tCtx)) } func createTelemetry() pcommon.Resource { diff --git a/pkg/ottl/contexts/ottlscope/scope.go b/pkg/ottl/contexts/ottlscope/scope.go index b6f2cace83d1..8a8ea25bfd87 100644 --- a/pkg/ottl/contexts/ottlscope/scope.go +++ b/pkg/ottl/contexts/ottlscope/scope.go @@ -4,7 +4,6 @@ package ottlscope // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlscope" import ( - "context" "errors" "fmt" @@ -13,6 +12,7 @@ import ( "go.uber.org/zap/zapcore" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxcache" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxerror" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxresource" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxscope" @@ -77,10 +77,6 @@ func (tCtx TransformContext) GetResource() pcommon.Resource { return tCtx.resource } -func (tCtx TransformContext) getCache() pcommon.Map { - return tCtx.cache -} - func (tCtx TransformContext) GetScopeSchemaURLItem() ctxutil.SchemaURLItem { return tCtx.schemaURLItem } @@ -89,8 +85,20 @@ func (tCtx TransformContext) GetResourceSchemaURLItem() ctxutil.SchemaURLItem { return tCtx.schemaURLItem } +type pathExpressionParser struct { + telemetrySettings component.TelemetrySettings + cacheGetSetter ottl.PathExpressionParser[TransformContext] +} + +func getCache(tCtx TransformContext) pcommon.Map { + return tCtx.cache +} + func NewParser(functions map[string]ottl.Factory[TransformContext], telemetrySettings component.TelemetrySettings, options ...Option) (ottl.Parser[TransformContext], error) { - pep := pathExpressionParser{telemetrySettings} + pep := pathExpressionParser{ + telemetrySettings: telemetrySettings, + cacheGetSetter: ctxcache.PathExpressionParser[TransformContext](getCache), + } p, err := ottl.NewParser[TransformContext]( functions, pep.parsePath, @@ -156,10 +164,6 @@ func parseEnum(_ *ottl.EnumSymbol) (*ottl.Enum, error) { return nil, fmt.Errorf("instrumentation scope context does not provide Enum support") } -type pathExpressionParser struct { - telemetrySettings component.TelemetrySettings -} - func (pep *pathExpressionParser) parsePath(path ottl.Path[TransformContext]) (ottl.GetSetter[TransformContext], error) { if path == nil { return nil, ctxerror.New("nil", "nil", ctxscope.Name, ctxscope.DocRef) @@ -175,10 +179,7 @@ func (pep *pathExpressionParser) parsePath(path ottl.Path[TransformContext]) (ot switch path.Name() { case "cache": - if path.Keys() == nil { - return accessCache(), nil - } - return accessCacheKey(path.Keys()), nil + return pep.cacheGetSetter(path) default: return ctxscope.PathGetSetter[TransformContext](ContextName, path) } @@ -196,28 +197,3 @@ func (pep *pathExpressionParser) parseHigherContextPath(context string, path ott return nil, ctxerror.New(context, fullPath, ctxscope.Name, ctxscope.DocRef) } } - -func accessCache() ottl.StandardGetSetter[TransformContext] { - return ottl.StandardGetSetter[TransformContext]{ - Getter: func(_ context.Context, tCtx TransformContext) (any, error) { - return tCtx.getCache(), nil - }, - Setter: func(_ context.Context, tCtx TransformContext, val any) error { - if m, ok := val.(pcommon.Map); ok { - m.CopyTo(tCtx.getCache()) - } - return nil - }, - } -} - -func accessCacheKey(key []ottl.Key[TransformContext]) ottl.StandardGetSetter[TransformContext] { - return ottl.StandardGetSetter[TransformContext]{ - Getter: func(ctx context.Context, tCtx TransformContext) (any, error) { - return ctxutil.GetMapValue[TransformContext](ctx, tCtx, tCtx.getCache(), key) - }, - Setter: func(ctx context.Context, tCtx TransformContext, val any) error { - return ctxutil.SetMapValue[TransformContext](ctx, tCtx, tCtx.getCache(), key, val) - }, - } -} diff --git a/pkg/ottl/contexts/ottlscope/scope_test.go b/pkg/ottl/contexts/ottlscope/scope_test.go index 3e5ae16b4857..3ad4d1261598 100644 --- a/pkg/ottl/contexts/ottlscope/scope_test.go +++ b/pkg/ottl/contexts/ottlscope/scope_test.go @@ -11,10 +11,10 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxcache" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/pathtest" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/ottltest" ) @@ -398,13 +398,19 @@ func Test_newPathGetSetter(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - pep := pathExpressionParser{} + testCache := pcommon.NewMap() + cacheGetter := func(_ TransformContext) pcommon.Map { + return testCache + } + pep := pathExpressionParser{ + cacheGetSetter: ctxcache.PathExpressionParser(cacheGetter), + } accessor, err := pep.parsePath(tt.path) assert.NoError(t, err) - il, resource := createTelemetry() + is, res := createTelemetry() - tCtx := NewTransformContext(il, resource, plog.NewScopeLogs()) + tCtx := NewTransformContext(is, res, pmetric.NewResourceMetrics(), WithCache(&testCache)) got, err := accessor.Get(context.Background(), tCtx) assert.NoError(t, err) assert.Equal(t, tt.orig, got) @@ -412,13 +418,13 @@ func Test_newPathGetSetter(t *testing.T) { err = accessor.Set(context.Background(), tCtx, tt.newVal) assert.NoError(t, err) - exIl, exRes := createTelemetry() + exIs, exRes := createTelemetry() exCache := pcommon.NewMap() - tt.modified(exIl, exRes, exCache) + tt.modified(exIs, exRes, exCache) - assert.Equal(t, exIl, il) - assert.Equal(t, exRes, resource) - assert.Equal(t, exCache, tCtx.getCache()) + assert.Equal(t, exIs, is) + assert.Equal(t, exRes, res) + assert.Equal(t, exCache, testCache) }) } } @@ -426,7 +432,11 @@ func Test_newPathGetSetter(t *testing.T) { func Test_newPathGetSetter_higherContextPath(t *testing.T) { resource := pcommon.NewResource() resource.Attributes().PutStr("foo", "bar") - ctx := NewTransformContext(pcommon.NewInstrumentationScope(), resource, plog.NewScopeLogs()) + + scope := pcommon.NewInstrumentationScope() + scope.SetName("instrumentation_scope") + + ctx := NewTransformContext(scope, resource, pmetric.NewResourceMetrics()) tests := []struct { name string @@ -473,14 +483,14 @@ func Test_newPathGetSetter_WithCache(t *testing.T) { cacheValue := pcommon.NewMap() cacheValue.PutStr("test", "pass") - ctx := NewTransformContext( + tCtx := NewTransformContext( pcommon.NewInstrumentationScope(), pcommon.NewResource(), pmetric.NewResourceMetrics(), WithCache(&cacheValue), ) - assert.Equal(t, cacheValue, ctx.getCache()) + assert.Equal(t, cacheValue, getCache(tCtx)) } func createTelemetry() (pcommon.InstrumentationScope, pcommon.Resource) { diff --git a/pkg/ottl/contexts/ottlspan/span.go b/pkg/ottl/contexts/ottlspan/span.go index 0ad89a465a62..87b2aaa22c45 100644 --- a/pkg/ottl/contexts/ottlspan/span.go +++ b/pkg/ottl/contexts/ottlspan/span.go @@ -4,7 +4,6 @@ package ottlspan // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspan" import ( - "context" "errors" "fmt" @@ -14,6 +13,7 @@ import ( "go.uber.org/zap/zapcore" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxcache" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxerror" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxresource" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxscope" @@ -89,10 +89,6 @@ func (tCtx TransformContext) GetResource() pcommon.Resource { return tCtx.resource } -func (tCtx TransformContext) getCache() pcommon.Map { - return tCtx.cache -} - func (tCtx TransformContext) GetResourceSchemaURLItem() ctxutil.SchemaURLItem { return tCtx.resourceSpans } @@ -101,8 +97,15 @@ func (tCtx TransformContext) GetScopeSchemaURLItem() ctxutil.SchemaURLItem { return tCtx.scopeSpans } +func getCache(tCtx TransformContext) pcommon.Map { + return tCtx.cache +} + func NewParser(functions map[string]ottl.Factory[TransformContext], telemetrySettings component.TelemetrySettings, options ...Option) (ottl.Parser[TransformContext], error) { - pep := pathExpressionParser{telemetrySettings} + pep := pathExpressionParser{ + telemetrySettings: telemetrySettings, + cacheGetSetter: ctxcache.PathExpressionParser[TransformContext](getCache), + } p, err := ottl.NewParser[TransformContext]( functions, pep.parsePath, @@ -177,6 +180,7 @@ func parseEnum(val *ottl.EnumSymbol) (*ottl.Enum, error) { type pathExpressionParser struct { telemetrySettings component.TelemetrySettings + cacheGetSetter ottl.PathExpressionParser[TransformContext] } func (pep *pathExpressionParser) parsePath(path ottl.Path[TransformContext]) (ottl.GetSetter[TransformContext], error) { @@ -194,10 +198,7 @@ func (pep *pathExpressionParser) parsePath(path ottl.Path[TransformContext]) (ot switch path.Name() { case "cache": - if path.Keys() == nil { - return accessCache(), nil - } - return accessCacheKey(path.Keys()), nil + return pep.cacheGetSetter(path) default: return ctxspan.PathGetSetter[TransformContext](ctxspan.Name, path) } @@ -217,28 +218,3 @@ func (pep *pathExpressionParser) parseHigherContextPath(context string, path ott return nil, ctxerror.New(context, fullPath, ctxspan.Name, ctxspan.DocRef) } } - -func accessCache() ottl.StandardGetSetter[TransformContext] { - return ottl.StandardGetSetter[TransformContext]{ - Getter: func(_ context.Context, tCtx TransformContext) (any, error) { - return tCtx.getCache(), nil - }, - Setter: func(_ context.Context, tCtx TransformContext, val any) error { - if m, ok := val.(pcommon.Map); ok { - m.CopyTo(tCtx.getCache()) - } - return nil - }, - } -} - -func accessCacheKey(key []ottl.Key[TransformContext]) ottl.StandardGetSetter[TransformContext] { - return ottl.StandardGetSetter[TransformContext]{ - Getter: func(ctx context.Context, tCtx TransformContext) (any, error) { - return ctxutil.GetMapValue[TransformContext](ctx, tCtx, tCtx.getCache(), key) - }, - Setter: func(ctx context.Context, tCtx TransformContext, val any) error { - return ctxutil.SetMapValue[TransformContext](ctx, tCtx, tCtx.getCache(), key, val) - }, - } -} diff --git a/pkg/ottl/contexts/ottlspan/span_test.go b/pkg/ottl/contexts/ottlspan/span_test.go index a711faa62407..30f671796708 100644 --- a/pkg/ottl/contexts/ottlspan/span_test.go +++ b/pkg/ottl/contexts/ottlspan/span_test.go @@ -12,10 +12,12 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxcache" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxspan" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/pathtest" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/ottltest" @@ -665,7 +667,18 @@ func Test_newPathGetSetter(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - pep := pathExpressionParser{} + // Create a controlled cache map for testing + testCache := pcommon.NewMap() + cacheGetter := func(_ TransformContext) pcommon.Map { + return testCache + } + + // Initialize parser with a cache getter that returns our controlled map + pep := pathExpressionParser{ + telemetrySettings: component.TelemetrySettings{}, + cacheGetSetter: ctxcache.PathExpressionParser(cacheGetter), + } + accessor, err := pep.parsePath(tt.path) assert.NoError(t, err) @@ -687,7 +700,7 @@ func Test_newPathGetSetter(t *testing.T) { assert.Equal(t, exSpan, span) assert.Equal(t, exIl, il) assert.Equal(t, exRes, resource) - assert.Equal(t, exCache, tCtx.getCache()) + assert.Equal(t, exCache, testCache) }) } } @@ -752,6 +765,22 @@ func Test_newPathGetSetter_higherContextPath(t *testing.T) { } } +func Test_newPathGetSetter_WithCache(t *testing.T) { + cacheValue := pcommon.NewMap() + cacheValue.PutStr("test", "pass") + + tCtx := NewTransformContext( + ptrace.NewSpan(), + pcommon.NewInstrumentationScope(), + pcommon.NewResource(), + ptrace.NewScopeSpans(), + ptrace.NewResourceSpans(), + WithCache(&cacheValue), + ) + + assert.Equal(t, cacheValue, getCache(tCtx)) +} + func createTelemetry() (ptrace.Span, pcommon.InstrumentationScope, pcommon.Resource) { span := ptrace.NewSpan() span.SetTraceID(traceID) diff --git a/pkg/ottl/contexts/ottlspanevent/span_events.go b/pkg/ottl/contexts/ottlspanevent/span_events.go index 7276fc0a6d77..9e026c43a164 100644 --- a/pkg/ottl/contexts/ottlspanevent/span_events.go +++ b/pkg/ottl/contexts/ottlspanevent/span_events.go @@ -14,6 +14,7 @@ import ( "go.uber.org/zap/zapcore" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxcache" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxerror" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxresource" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxscope" @@ -109,10 +110,6 @@ func (tCtx TransformContext) GetResource() pcommon.Resource { return tCtx.resource } -func (tCtx TransformContext) getCache() pcommon.Map { - return tCtx.cache -} - func (tCtx TransformContext) GetScopeSchemaURLItem() ctxutil.SchemaURLItem { return tCtx.scopeSpans } @@ -131,8 +128,20 @@ func (tCtx TransformContext) GetEventIndex() (int64, error) { return 0, errors.New("no 'event_index' property has been set") } +func getCache(tCtx TransformContext) pcommon.Map { + return tCtx.cache +} + +type pathExpressionParser struct { + telemetrySettings component.TelemetrySettings + cacheGetSetter ottl.PathExpressionParser[TransformContext] +} + func NewParser(functions map[string]ottl.Factory[TransformContext], telemetrySettings component.TelemetrySettings, options ...Option) (ottl.Parser[TransformContext], error) { - pep := pathExpressionParser{telemetrySettings} + pep := pathExpressionParser{ + telemetrySettings: telemetrySettings, + cacheGetSetter: ctxcache.PathExpressionParser[TransformContext](getCache), + } p, err := ottl.NewParser[TransformContext]( functions, pep.parsePath, @@ -206,10 +215,6 @@ func parseEnum(val *ottl.EnumSymbol) (*ottl.Enum, error) { return nil, fmt.Errorf("enum symbol not provided") } -type pathExpressionParser struct { - telemetrySettings component.TelemetrySettings -} - func (pep *pathExpressionParser) parsePath(path ottl.Path[TransformContext]) (ottl.GetSetter[TransformContext], error) { if path == nil { return nil, ctxerror.New("nil", "nil", ctxspanevent.Name, ctxspanevent.DocRef) @@ -228,10 +233,7 @@ func (pep *pathExpressionParser) parsePath(path ottl.Path[TransformContext]) (ot switch path.Name() { case "cache": - if path.Keys() == nil { - return accessCache(), nil - } - return accessCacheKey(path.Keys()), nil + return pep.cacheGetSetter(path) case "event_index": return accessSpanEventIndex(), nil default: @@ -256,31 +258,6 @@ func (pep *pathExpressionParser) parseHigherContextPath(context string, path ott } } -func accessCache() ottl.StandardGetSetter[TransformContext] { - return ottl.StandardGetSetter[TransformContext]{ - Getter: func(_ context.Context, tCtx TransformContext) (any, error) { - return tCtx.getCache(), nil - }, - Setter: func(_ context.Context, tCtx TransformContext, val any) error { - if m, ok := val.(pcommon.Map); ok { - m.CopyTo(tCtx.getCache()) - } - return nil - }, - } -} - -func accessCacheKey(key []ottl.Key[TransformContext]) ottl.StandardGetSetter[TransformContext] { - return ottl.StandardGetSetter[TransformContext]{ - Getter: func(ctx context.Context, tCtx TransformContext) (any, error) { - return ctxutil.GetMapValue[TransformContext](ctx, tCtx, tCtx.getCache(), key) - }, - Setter: func(ctx context.Context, tCtx TransformContext, val any) error { - return ctxutil.SetMapValue[TransformContext](ctx, tCtx, tCtx.getCache(), key, val) - }, - } -} - func accessSpanEventIndex() ottl.StandardGetSetter[TransformContext] { return ottl.StandardGetSetter[TransformContext]{ Getter: func(_ context.Context, tCtx TransformContext) (any, error) { diff --git a/pkg/ottl/contexts/ottlspanevent/span_events_test.go b/pkg/ottl/contexts/ottlspanevent/span_events_test.go index 5f0a857d9d34..0c3f02d6510e 100644 --- a/pkg/ottl/contexts/ottlspanevent/span_events_test.go +++ b/pkg/ottl/contexts/ottlspanevent/span_events_test.go @@ -15,6 +15,7 @@ import ( "go.opentelemetry.io/collector/pdata/ptrace" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxcache" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxspanevent" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/pathtest" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/ottltest" @@ -431,13 +432,19 @@ func Test_newPathGetSetter(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - pep := pathExpressionParser{} + testCache := pcommon.NewMap() + cacheGetter := func(_ TransformContext) pcommon.Map { + return testCache + } + pep := pathExpressionParser{ + cacheGetSetter: ctxcache.PathExpressionParser(cacheGetter), + } accessor, err := pep.parsePath(tt.path) assert.NoError(t, err) spanEvent, span, il, resource := createTelemetry() - tCtx := NewTransformContext(spanEvent, span, il, resource, ptrace.NewScopeSpans(), ptrace.NewResourceSpans(), WithEventIndex(1)) + tCtx := NewTransformContext(spanEvent, span, il, resource, ptrace.NewScopeSpans(), ptrace.NewResourceSpans(), WithEventIndex(1), WithCache(&testCache)) got, err := accessor.Get(context.Background(), tCtx) assert.NoError(t, err) @@ -457,7 +464,7 @@ func Test_newPathGetSetter(t *testing.T) { assert.Equal(t, exSpan, span) assert.Equal(t, exIl, il) assert.Equal(t, exRes, resource) - assert.Equal(t, exCache, tCtx.getCache()) + assert.Equal(t, exCache, testCache) }) } } @@ -591,6 +598,23 @@ func Test_setAndGetEventIndex(t *testing.T) { } } +func Test_newPathGetSetter_WithCache(t *testing.T) { + cacheValue := pcommon.NewMap() + cacheValue.PutStr("test", "pass") + + tCtx := NewTransformContext( + ptrace.NewSpanEvent(), + ptrace.NewSpan(), + pcommon.NewInstrumentationScope(), + pcommon.NewResource(), + ptrace.NewScopeSpans(), + ptrace.NewResourceSpans(), + WithCache(&cacheValue), + ) + + assert.Equal(t, cacheValue, getCache(tCtx)) +} + func createTelemetry() (ptrace.SpanEvent, ptrace.Span, pcommon.InstrumentationScope, pcommon.Resource) { span := ptrace.NewSpan() span.SetName("test")