Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[schema processor] Part 3 - Modifiers and Revisions #12147 #17020

Merged
Merged
3 changes: 2 additions & 1 deletion processor/schemaprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ require (
go.opentelemetry.io/collector/confmap v0.78.1
go.opentelemetry.io/collector/consumer v0.78.1
go.opentelemetry.io/collector/pdata v1.0.0-rcv0012
go.opentelemetry.io/otel/schema v0.0.4
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.24.0
)

Expand Down Expand Up @@ -39,7 +41,6 @@ require (
go.opentelemetry.io/otel/metric v0.38.1 // indirect
go.opentelemetry.io/otel/trace v1.15.1 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/text v0.9.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions processor/schemaprocessor/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 13 additions & 5 deletions processor/schemaprocessor/internal/alias/alias.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ type Resource interface {
Resource() pcommon.Resource
}

// Signal represents a subset of incoming pdata
// NamedSignal represents a subset of incoming pdata
// that can be updated using the schema processor
type Signal interface {
type NamedSignal interface {
Name() string

SetName(name string)
Expand All @@ -35,7 +35,15 @@ var (
_ Resource = (*pmetric.ResourceMetrics)(nil)
_ Resource = (*ptrace.ResourceSpans)(nil)

_ Signal = (*pmetric.Metric)(nil)
_ Signal = (*ptrace.Span)(nil)
_ Signal = (*ptrace.SpanEvent)(nil)
_ NamedSignal = (*pmetric.Metric)(nil)
_ NamedSignal = (*ptrace.Span)(nil)
_ NamedSignal = (*ptrace.SpanEvent)(nil)
)

// AttributeKey is a type alias of string to help
// make clear what the strings being stored represent
type AttributeKey = string

// SignalName is a type alias of a string to help
// make clear what a type field is being used for.
type SignalName = string
16 changes: 16 additions & 0 deletions processor/schemaprocessor/internal/alias/alias_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package alias

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestTypeAliases(t *testing.T) {
t.Parallel()

assert.IsType(t, (*string)(nil), (*AttributeKey)(nil))
assert.IsType(t, (*string)(nil), (*SignalName)(nil))
}
113 changes: 113 additions & 0 deletions processor/schemaprocessor/internal/migrate/attributes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package migrate // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/migrate"

import (
"fmt"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/otel/schema/v1.0/ast"
"go.uber.org/multierr"
)

// AttributeChangeSet represents an unscoped entry that can be applied.
//
// The listed changes are duplicated twice
// to allow for simplified means of transition to or from a revision.
type AttributeChangeSet struct {
updates ast.AttributeMap
rollback ast.AttributeMap
}

// AttributeChangeSetSlice allows for `AttributeChangeSet`
// to be chained together as they are defined within the schema
// and be applied sequentially to ensure deterministic behavior.
type AttributeChangeSetSlice []*AttributeChangeSet
tigrannajaryan marked this conversation as resolved.
Show resolved Hide resolved

// NewAttributeChangeSet allows for typed strings to be used as part
// of the invocation that will be converted into the default string type.
func NewAttributeChangeSet(mappings ast.AttributeMap) *AttributeChangeSet {
attr := &AttributeChangeSet{
updates: make(map[string]string, len(mappings)),
rollback: make(map[string]string, len(mappings)),
}
for k, v := range mappings {
attr.updates[k] = v
attr.rollback[v] = k
}
return attr
}

func (a *AttributeChangeSet) Apply(attrs pcommon.Map) error {
return a.do(StateSelectorApply, attrs)
}

func (a *AttributeChangeSet) Rollback(attrs pcommon.Map) error {
return a.do(StateSelectorRollback, attrs)
}

func (a *AttributeChangeSet) do(ss StateSelector, attrs pcommon.Map) (errs error) {
var (
updated = make(map[string]struct{})
results = pcommon.NewMap()
)
attrs.Range(func(k string, v pcommon.Value) bool {
var (
key string
matched bool
)
switch ss {
case StateSelectorApply:
key, matched = a.updates[k]
case StateSelectorRollback:
key, matched = a.rollback[k]
}
if matched {
k, updated[key] = key, struct{}{}
} else {
// TODO: Since the spec hasn't decided the behavior on what
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Link to an issue for deciding this in the comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not yet, was raised by @tigrannajaryan in a comment so I will raise it in the morning on the spec repo (or if anyone else gets to it before me).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to make a decision on open-telemetry/opentelemetry-specification#3497
Depending on the outcome the performance may be significantly impacted.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this must be solved before marking the component as alpha and adding it to our official builds, but still I don't think it should block merging this PR in the interest of moving forward with other parts of the implementation

// should happen on a name conflict, this will assume
// the rewrite has priority and will set it to the original
// entry's value, not the existing value.
if _, overridden := updated[k]; overridden {
errs = multierr.Append(errs, fmt.Errorf("value %q already exists", k))
return true
}
}
v.CopyTo(results.PutEmpty(k))
return true
})
results.CopyTo(attrs)
return errs
}

// NewAttributeChangeSetSlice combines all the provided `AttributeChangeSets`
// and allows them to be executed in the provided order.
func NewAttributeChangeSetSlice(changes ...*AttributeChangeSet) *AttributeChangeSetSlice {
values := new(AttributeChangeSetSlice)
for _, c := range changes {
(*values) = append((*values), c)
}
return values
}

func (slice *AttributeChangeSetSlice) Apply(attrs pcommon.Map) error {
return slice.do(StateSelectorApply, attrs)
}

func (slice *AttributeChangeSetSlice) Rollback(attrs pcommon.Map) error {
return slice.do(StateSelectorRollback, attrs)
}

func (slice *AttributeChangeSetSlice) do(ss StateSelector, attrs pcommon.Map) (errs error) {
for i := 0; i < len(*slice); i++ {
switch ss {
case StateSelectorApply:
errs = multierr.Append(errs, (*slice)[i].Apply(attrs))
case StateSelectorRollback:
errs = multierr.Append(errs, (*slice)[len(*slice)-1-i].Rollback(attrs))
}
}
return errs
}
Loading