Skip to content

Commit

Permalink
[chore][pkg/ottl] Move cache management into ctxcache (open-telemetry…
Browse files Browse the repository at this point in the history
…#38228)

This consolidates logic for cache management into one place. The
experimental `WithCache` option makes this a little awkward but it works
by passing in a getter function for the cache `func() pcommon.Map` and
returning an `ottl.PathExpressionParser` which can be saved and used by
each context.
  • Loading branch information
djaglowski authored Feb 27, 2025
1 parent 81c5eee commit a9f5ff9
Show file tree
Hide file tree
Showing 16 changed files with 494 additions and 312 deletions.
47 changes: 47 additions & 0 deletions pkg/ottl/contexts/internal/ctxcache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package ctxcache // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxcache"

import (
"context"

"go.opentelemetry.io/collector/pdata/pcommon"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxutil"
)

func PathExpressionParser[K any](cacheGetter func(K) pcommon.Map) ottl.PathExpressionParser[K] {
return func(path ottl.Path[K]) (ottl.GetSetter[K], error) {
if path.Keys() == nil {
return accessCache(cacheGetter), nil
}
return accessCacheKey(cacheGetter, path.Keys()), nil
}
}

func accessCache[K any](cacheGetter func(K) pcommon.Map) ottl.StandardGetSetter[K] {
return ottl.StandardGetSetter[K]{
Getter: func(_ context.Context, tCtx K) (any, error) {
return cacheGetter(tCtx), nil
},
Setter: func(_ context.Context, tCtx K, val any) error {
if m, ok := val.(pcommon.Map); ok {
m.CopyTo(cacheGetter(tCtx))
}
return nil
},
}
}

func accessCacheKey[K any](cacheGetter func(K) pcommon.Map, key []ottl.Key[K]) ottl.StandardGetSetter[K] {
return ottl.StandardGetSetter[K]{
Getter: func(ctx context.Context, tCtx K) (any, error) {
return ctxutil.GetMapValue(ctx, tCtx, cacheGetter(tCtx), key)
},
Setter: func(ctx context.Context, tCtx K, val any) error {
return ctxutil.SetMapValue(ctx, tCtx, cacheGetter(tCtx), key, val)
},
}
}
183 changes: 183 additions & 0 deletions pkg/ottl/contexts/internal/ctxcache/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package ctxcache

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/pathtest"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/ottltest"
)

func Test_PathExpressionParser(t *testing.T) {
cache := pcommon.NewMap()
cache.PutStr("key1", "value1")
cache.PutInt("key2", 42)

ctx := newTestContext(cache)

parser := PathExpressionParser(func(tCtx testContext) pcommon.Map {
return tCtx.getCache()
})

t.Run("access entire cache", func(t *testing.T) {
path := &pathtest.Path[testContext]{
N: "cache",
}

getter, err := parser(path)
require.NoError(t, err)

val, err := getter.Get(context.Background(), ctx)
require.NoError(t, err)
require.Equal(t, cache, val)

result, ok := val.(pcommon.Map)
require.True(t, ok)

v1, ok := result.Get("key1")
assert.True(t, ok)
assert.Equal(t, "value1", v1.Str())

v2, ok := result.Get("key2")
assert.True(t, ok)
assert.Equal(t, int64(42), v2.Int())
})

t.Run("access specific cache key", func(t *testing.T) {
path := &pathtest.Path[testContext]{
N: "cache",
KeySlice: []ottl.Key[testContext]{
&pathtest.Key[testContext]{
S: ottltest.Strp("key1"),
},
},
}

getter, err := parser(path)
require.NoError(t, err)

val, err := getter.Get(context.Background(), ctx)
require.NoError(t, err)
assert.Equal(t, "value1", val)
})

t.Run("modify entire cache", func(t *testing.T) {
path := &pathtest.Path[testContext]{
N: "cache",
}

getter, err := parser(path)
require.NoError(t, err)

newCache := pcommon.NewMap()
newCache.PutStr("new_key", "new_value")

err = getter.Set(context.Background(), ctx, newCache)
require.NoError(t, err)

val, ok := ctx.cache.Get("new_key")
assert.True(t, ok)
assert.Equal(t, "new_value", val.Str())
require.NotEqual(t, cache, val)
})

t.Run("modify specific cache key", func(t *testing.T) {
path := &pathtest.Path[testContext]{
N: "cache",
KeySlice: []ottl.Key[testContext]{
&pathtest.Key[testContext]{
S: ottltest.Strp("key1"),
},
},
}

getter, err := parser(path)
require.NoError(t, err)

err = getter.Set(context.Background(), ctx, "updated_value")
require.NoError(t, err)

v, ok := ctx.cache.Get("key1")
assert.True(t, ok)
assert.Equal(t, "updated_value", v.Str())
})

t.Run("add new cache key", func(t *testing.T) {
path := &pathtest.Path[testContext]{
N: "cache",
KeySlice: []ottl.Key[testContext]{
&pathtest.Key[testContext]{
S: ottltest.Strp("key3"),
},
},
}

getter, err := parser(path)
require.NoError(t, err)

err = getter.Set(context.Background(), ctx, "value3")
require.NoError(t, err)

v, ok := ctx.cache.Get("key3")
assert.True(t, ok)
assert.Equal(t, "value3", v.Str())
})

t.Run("access nested key", func(t *testing.T) {
nestedMap := pcommon.NewMap()
nestedMap.PutStr("nested_key", "nested_value")
parentMap := ctx.cache.PutEmptyMap("parent")
nestedMap.CopyTo(parentMap)

path := &pathtest.Path[testContext]{
N: "cache",
KeySlice: []ottl.Key[testContext]{
&pathtest.Key[testContext]{
S: ottltest.Strp("parent"),
},
&pathtest.Key[testContext]{
S: ottltest.Strp("nested_key"),
},
},
}

getter, err := parser(path)
require.NoError(t, err)

val, err := getter.Get(context.Background(), ctx)
require.NoError(t, err)
assert.Equal(t, "nested_value", val)

err = getter.Set(context.Background(), ctx, "updated_nested_value")
require.NoError(t, err)

parentValue, ok1 := ctx.cache.Get("parent")
require.True(t, ok1)
parentMap = parentValue.Map()
nestedValue, ok2 := parentMap.Get("nested_key")
require.True(t, ok2)
assert.Equal(t, "updated_nested_value", nestedValue.Str())
})
}

type testContext struct {
cache pcommon.Map
}

func (tCtx testContext) getCache() pcommon.Map {
return tCtx.cache
}

func newTestContext(cache pcommon.Map) testContext {
return testContext{
cache: cache,
}
}
54 changes: 15 additions & 39 deletions pkg/ottl/contexts/ottldatapoint/datapoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package ottldatapoint // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottldatapoint"

import (
"context"
"errors"
"fmt"

Expand All @@ -14,6 +13,7 @@ import (
"go.uber.org/zap/zapcore"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxcache"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxdatapoint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxerror"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/ctxmetric"
Expand Down Expand Up @@ -114,10 +114,6 @@ func (tCtx TransformContext) GetMetrics() pmetric.MetricSlice {
return tCtx.metrics
}

func (tCtx TransformContext) getCache() pcommon.Map {
return tCtx.cache
}

func (tCtx TransformContext) GetScopeSchemaURLItem() ctxutil.SchemaURLItem {
return tCtx.scopeMetrics
}
Expand All @@ -126,8 +122,20 @@ func (tCtx TransformContext) GetResourceSchemaURLItem() ctxutil.SchemaURLItem {
return tCtx.resourceMetrics
}

func getCache(tCtx TransformContext) pcommon.Map {
return tCtx.cache
}

type pathExpressionParser struct {
telemetrySettings component.TelemetrySettings
cacheGetSetter ottl.PathExpressionParser[TransformContext]
}

func NewParser(functions map[string]ottl.Factory[TransformContext], telemetrySettings component.TelemetrySettings, options ...Option) (ottl.Parser[TransformContext], error) {
pep := pathExpressionParser{telemetrySettings}
pep := pathExpressionParser{
telemetrySettings: telemetrySettings,
cacheGetSetter: ctxcache.PathExpressionParser(getCache),
}
p, err := ottl.NewParser[TransformContext](
functions,
pep.parsePath,
Expand Down Expand Up @@ -201,10 +209,6 @@ func parseEnum(val *ottl.EnumSymbol) (*ottl.Enum, error) {
return nil, fmt.Errorf("enum symbol not provided")
}

type pathExpressionParser struct {
telemetrySettings component.TelemetrySettings
}

func (pep *pathExpressionParser) parsePath(path ottl.Path[TransformContext]) (ottl.GetSetter[TransformContext], error) {
if path == nil {
return nil, ctxerror.New("nil", "nil", ctxdatapoint.Name, ctxdatapoint.DocRef)
Expand All @@ -222,10 +226,7 @@ func (pep *pathExpressionParser) parsePath(path ottl.Path[TransformContext]) (ot

switch path.Name() {
case "cache":
if path.Keys() == nil {
return accessCache(), nil
}
return accessCacheKey(path.Keys()), nil
return pep.cacheGetSetter(path)
default:
return ctxdatapoint.PathGetSetter(path)
}
Expand All @@ -247,28 +248,3 @@ func (pep *pathExpressionParser) parseHigherContextPath(context string, path ott
return nil, ctxerror.New(context, fullPath, ctxdatapoint.Name, ctxdatapoint.DocRef)
}
}

func accessCache() ottl.StandardGetSetter[TransformContext] {
return ottl.StandardGetSetter[TransformContext]{
Getter: func(_ context.Context, tCtx TransformContext) (any, error) {
return tCtx.getCache(), nil
},
Setter: func(_ context.Context, tCtx TransformContext, val any) error {
if m, ok := val.(pcommon.Map); ok {
m.CopyTo(tCtx.getCache())
}
return nil
},
}
}

func accessCacheKey(key []ottl.Key[TransformContext]) ottl.StandardGetSetter[TransformContext] {
return ottl.StandardGetSetter[TransformContext]{
Getter: func(ctx context.Context, tCtx TransformContext) (any, error) {
return ctxutil.GetMapValue[TransformContext](ctx, tCtx, tCtx.getCache(), key)
},
Setter: func(ctx context.Context, tCtx TransformContext, val any) error {
return ctxutil.SetMapValue[TransformContext](ctx, tCtx, tCtx.getCache(), key, val)
},
}
}
Loading

0 comments on commit a9f5ff9

Please sign in to comment.