Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move model pdata interfaces to pdata, expose them publicly #3455

Merged
merged 1 commit into from
Jun 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions consumer/pdata/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,13 @@
// is non-nil. Several structures also provide New*Slice functions that allow creating
// more than one instance of the struct more efficiently instead of calling New*
// repeatedly. Use it where appropriate.
//
// This package also provides common ways for decoding serialized bytes into protocol-specific
// in-memory data models (e.g. Zipkin Span). These data models can then be translated to pdata
// representations. Similarly, pdata types can be translated from a data model which can then
// be serialized into bytes.
//
// * Encoding: Common interfaces for serializing/deserializing bytes from/to protocol-specific data models.
// * Translation: Common interfaces for translating protocol-specific data models from/to pdata types.
// * Marshaling: Common higher level APIs that do both encoding and translation of bytes and data model if going directly pdata types to bytes.
package pdata
2 changes: 1 addition & 1 deletion internal/model/errors.go → consumer/pdata/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package model
package pdata

import (
"fmt"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package model
package pdata

import (
"testing"
Expand Down
96 changes: 95 additions & 1 deletion consumer/pdata/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,106 @@
package pdata

import (
"fmt"

"go.opentelemetry.io/collector/internal"
otlpcollectorlog "go.opentelemetry.io/collector/internal/data/protogen/collector/logs/v1"
otlplogs "go.opentelemetry.io/collector/internal/data/protogen/logs/v1"
)

// This file defines in-memory data structures to represent logs.
// LogsDecoder is an interface to decode bytes into protocol-specific data model.
type LogsDecoder interface {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put these marshalling/decoding/etc interfaces in a separate package or separate file?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can discuss about separate files, but more importantly is do we want same package or separate? Advantage for same package is that if we also put OTLP favors in pdata we don't need the InternalRep at all

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tigrannajaryan Are you OK with having them in the same package for the reason above?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you are OK, I would merge the PR as is, and will followup since is a non-breaking change, just to ensure we release these interfaces in the next release so I can start using them in contrib.

// DecodeLogs decodes bytes into protocol-specific data model.
// If the error is not nil, the returned interface cannot be used.
DecodeLogs(buf []byte) (interface{}, error)
}

// LogsEncoder is an interface to encode protocol-specific data model into bytes.
type LogsEncoder interface {
// EncodeLogs encodes protocol-specific data model into bytes.
// If the error is not nil, the returned bytes slice cannot be used.
EncodeLogs(model interface{}) ([]byte, error)
}

// FromLogsTranslator is an interface to translate pdata.Logs into protocol-specific data model.
type FromLogsTranslator interface {
// FromLogs translates pdata.Logs into protocol-specific data model.
// If the error is not nil, the returned pdata.Logs cannot be used.
FromLogs(ld Logs) (interface{}, error)
}

// ToLogsTranslator is an interface to translate a protocol-specific data model into pdata.Traces.
type ToLogsTranslator interface {
// ToLogs translates a protocol-specific data model into pdata.Logs.
// If the error is not nil, the returned pdata.Logs cannot be used.
ToLogs(src interface{}) (Logs, error)
}

// LogsMarshaler marshals pdata.Logs into bytes.
type LogsMarshaler interface {
// Marshal the given pdata.Logs into bytes.
// If the error is not nil, the returned bytes slice cannot be used.
Marshal(td Logs) ([]byte, error)
}

type logsMarshaler struct {
encoder LogsEncoder
translator FromLogsTranslator
}

// NewLogsMarshaler returns a new LogsMarshaler.
func NewLogsMarshaler(encoder LogsEncoder, translator FromLogsTranslator) LogsMarshaler {
return &logsMarshaler{
encoder: encoder,
translator: translator,
}
}

// Marshal pdata.Logs into bytes.
func (t *logsMarshaler) Marshal(td Logs) ([]byte, error) {
model, err := t.translator.FromLogs(td)
if err != nil {
return nil, fmt.Errorf("converting pdata to model failed: %w", err)
}
buf, err := t.encoder.EncodeLogs(model)
if err != nil {
return nil, fmt.Errorf("marshal failed: %w", err)
}
return buf, nil
}

// LogsUnmarshaler unmarshalls bytes into pdata.Logs.
type LogsUnmarshaler interface {
// Unmarshal the given bytes into pdata.Logs.
// If the error is not nil, the returned pdata.Logs cannot be used.
Unmarshal(buf []byte) (Logs, error)
}

type logsUnmarshaler struct {
decoder LogsDecoder
translator ToLogsTranslator
}

// NewLogsUnmarshaler returns a new LogsUnmarshaler.
func NewLogsUnmarshaler(decoder LogsDecoder, translator ToLogsTranslator) LogsUnmarshaler {
return &logsUnmarshaler{
decoder: decoder,
translator: translator,
}
}

// Unmarshal bytes into pdata.Logs. On error pdata.Logs is invalid.
func (t *logsUnmarshaler) Unmarshal(buf []byte) (Logs, error) {
model, err := t.decoder.DecodeLogs(buf)
if err != nil {
return Logs{}, fmt.Errorf("unmarshal failed: %w", err)
}
td, err := t.translator.ToLogs(model)
if err != nil {
return Logs{}, fmt.Errorf("converting model to pdata failed: %w", err)
}
return td, nil
}

// Logs is the top-level struct that is propagated through the logs pipeline.
//
Expand Down
96 changes: 96 additions & 0 deletions consumer/pdata/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package pdata

import (
"errors"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -25,6 +26,101 @@ import (
otlplogs "go.opentelemetry.io/collector/internal/data/protogen/logs/v1"
)

func TestLogsMarshal_TranslationError(t *testing.T) {
translator := &mockTranslator{}
encoder := &mockEncoder{}

lm := NewLogsMarshaler(encoder, translator)
ld := NewLogs()

translator.On("FromLogs", ld).Return(nil, errors.New("translation failed"))

_, err := lm.Marshal(ld)
assert.Error(t, err)
assert.EqualError(t, err, "converting pdata to model failed: translation failed")
}

func TestLogsMarshal_SerializeError(t *testing.T) {
translator := &mockTranslator{}
encoder := &mockEncoder{}

lm := NewLogsMarshaler(encoder, translator)
ld := NewLogs()
expectedModel := struct{}{}

translator.On("FromLogs", ld).Return(expectedModel, nil)
encoder.On("EncodeLogs", expectedModel).Return(nil, errors.New("serialization failed"))

_, err := lm.Marshal(ld)
assert.Error(t, err)
assert.EqualError(t, err, "marshal failed: serialization failed")
}

func TestLogsMarshal_Encode(t *testing.T) {
translator := &mockTranslator{}
encoder := &mockEncoder{}

lm := NewLogsMarshaler(encoder, translator)
expectedLogs := NewLogs()
expectedBytes := []byte{1, 2, 3}
expectedModel := struct{}{}

translator.On("FromLogs", expectedLogs).Return(expectedModel, nil)
encoder.On("EncodeLogs", expectedModel).Return(expectedBytes, nil)

actualBytes, err := lm.Marshal(expectedLogs)
assert.NoError(t, err)
assert.Equal(t, expectedBytes, actualBytes)
}

func TestLogsUnmarshal_EncodingError(t *testing.T) {
translator := &mockTranslator{}
encoder := &mockEncoder{}

lu := NewLogsUnmarshaler(encoder, translator)
expectedBytes := []byte{1, 2, 3}
expectedModel := struct{}{}

encoder.On("DecodeLogs", expectedBytes).Return(expectedModel, errors.New("decode failed"))

_, err := lu.Unmarshal(expectedBytes)
assert.Error(t, err)
assert.EqualError(t, err, "unmarshal failed: decode failed")
}

func TestLogsUnmarshal_TranslationError(t *testing.T) {
translator := &mockTranslator{}
encoder := &mockEncoder{}

lu := NewLogsUnmarshaler(encoder, translator)
expectedBytes := []byte{1, 2, 3}
expectedModel := struct{}{}

encoder.On("DecodeLogs", expectedBytes).Return(expectedModel, nil)
translator.On("ToLogs", expectedModel).Return(NewLogs(), errors.New("translation failed"))

_, err := lu.Unmarshal(expectedBytes)
assert.Error(t, err)
assert.EqualError(t, err, "converting model to pdata failed: translation failed")
}

func TestLogsUnmarshal_Decode(t *testing.T) {
translator := &mockTranslator{}
encoder := &mockEncoder{}

lu := NewLogsUnmarshaler(encoder, translator)
expectedLogs := NewLogs()
expectedBytes := []byte{1, 2, 3}
expectedModel := struct{}{}

encoder.On("DecodeLogs", expectedBytes).Return(expectedModel, nil)
translator.On("ToLogs", expectedModel).Return(expectedLogs, nil)

actualLogs, err := lu.Unmarshal(expectedBytes)
assert.NoError(t, err)
assert.Equal(t, expectedLogs, actualLogs)
}

func TestLogRecordCount(t *testing.T) {
md := NewLogs()
assert.EqualValues(t, 0, md.LogRecordCount())
Expand Down
124 changes: 110 additions & 14 deletions consumer/pdata/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,105 @@
package pdata

import (
"fmt"

"go.opentelemetry.io/collector/internal"
otlpcollectormetrics "go.opentelemetry.io/collector/internal/data/protogen/collector/metrics/v1"
otlpmetrics "go.opentelemetry.io/collector/internal/data/protogen/metrics/v1"
)

// AggregationTemporality defines how a metric aggregator reports aggregated values.
// It describes how those values relate to the time interval over which they are aggregated.
type AggregationTemporality int32
// MetricsDecoder is an interface to decode bytes into protocol-specific data model.
type MetricsDecoder interface {
// DecodeMetrics decodes bytes into protocol-specific data model.
// If the error is not nil, the returned interface cannot be used.
DecodeMetrics(buf []byte) (interface{}, error)
}

const (
// AggregationTemporalityUnspecified is the default AggregationTemporality, it MUST NOT be used.
AggregationTemporalityUnspecified = AggregationTemporality(otlpmetrics.AggregationTemporality_AGGREGATION_TEMPORALITY_UNSPECIFIED)
// AggregationTemporalityDelta is an AggregationTemporality for a metric aggregator which reports changes since last report time.
AggregationTemporalityDelta = AggregationTemporality(otlpmetrics.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA)
// AggregationTemporalityCumulative is an AggregationTemporality for a metric aggregator which reports changes since a fixed start time.
AggregationTemporalityCumulative = AggregationTemporality(otlpmetrics.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE)
)
// MetricsEncoder is an interface to encode protocol-specific data model into bytes.
type MetricsEncoder interface {
// EncodeMetrics encodes protocol-specific data model into bytes.
// If the error is not nil, the returned bytes slice cannot be used.
EncodeMetrics(model interface{}) ([]byte, error)
}

// String returns the string representation of the AggregationTemporality.
func (at AggregationTemporality) String() string {
return otlpmetrics.AggregationTemporality(at).String()
// FromMetricsTranslator is an interface to translate pdata.Metrics into protocol-specific data model.
type FromMetricsTranslator interface {
// FromMetrics translates pdata.Metrics into protocol-specific data model.
// If the error is not nil, the returned pdata.Metrics cannot be used.
FromMetrics(md Metrics) (interface{}, error)
}

// ToMetricsTranslator is an interface to translate a protocol-specific data model into pdata.Traces.
type ToMetricsTranslator interface {
// ToMetrics translates a protocol-specific data model into pdata.Metrics.
// If the error is not nil, the returned pdata.Metrics cannot be used.
ToMetrics(src interface{}) (Metrics, error)
}

// MetricsMarshaler marshals pdata.Metrics into bytes.
type MetricsMarshaler interface {
// Marshal the given pdata.Metrics into bytes.
// If the error is not nil, the returned bytes slice cannot be used.
Marshal(td Metrics) ([]byte, error)
}

type metricsMarshaler struct {
encoder MetricsEncoder
translator FromMetricsTranslator
}

// NewMetricsMarshaler returns a new MetricsMarshaler.
func NewMetricsMarshaler(encoder MetricsEncoder, translator FromMetricsTranslator) MetricsMarshaler {
return &metricsMarshaler{
encoder: encoder,
translator: translator,
}
}

// Marshal pdata.Metrics into bytes.
func (t *metricsMarshaler) Marshal(td Metrics) ([]byte, error) {
model, err := t.translator.FromMetrics(td)
if err != nil {
return nil, fmt.Errorf("converting pdata to model failed: %w", err)
}
buf, err := t.encoder.EncodeMetrics(model)
if err != nil {
return nil, fmt.Errorf("marshal failed: %w", err)
}
return buf, nil
}

// MetricsUnmarshaler unmarshalls bytes into pdata.Metrics.
type MetricsUnmarshaler interface {
// Unmarshal the given bytes into pdata.Metrics.
// If the error is not nil, the returned pdata.Metrics cannot be used.
Unmarshal(buf []byte) (Metrics, error)
}

type metricsUnmarshaler struct {
decoder MetricsDecoder
translator ToMetricsTranslator
}

// NewMetricsUnmarshaler returns a new MetricsUnmarshaler.
func NewMetricsUnmarshaler(decoder MetricsDecoder, translator ToMetricsTranslator) MetricsUnmarshaler {
return &metricsUnmarshaler{
decoder: decoder,
translator: translator,
}
}

// Unmarshal bytes into pdata.Metrics. On error pdata.Metrics is invalid.
func (t *metricsUnmarshaler) Unmarshal(buf []byte) (Metrics, error) {
model, err := t.decoder.DecodeMetrics(buf)
if err != nil {
return Metrics{}, fmt.Errorf("unmarshal failed: %w", err)
}
td, err := t.translator.ToMetrics(model)
if err != nil {
return Metrics{}, fmt.Errorf("converting model to pdata failed: %w", err)
}
return td, nil
}

// Metrics is an opaque interface that allows transition to the new internal Metrics data, but also facilitates the
Expand Down Expand Up @@ -313,3 +391,21 @@ func copyData(src, dest *otlpmetrics.Metric) {
dest.Data = data
}
}

// AggregationTemporality defines how a metric aggregator reports aggregated values.
// It describes how those values relate to the time interval over which they are aggregated.
type AggregationTemporality int32

const (
// AggregationTemporalityUnspecified is the default AggregationTemporality, it MUST NOT be used.
AggregationTemporalityUnspecified = AggregationTemporality(otlpmetrics.AggregationTemporality_AGGREGATION_TEMPORALITY_UNSPECIFIED)
// AggregationTemporalityDelta is an AggregationTemporality for a metric aggregator which reports changes since last report time.
AggregationTemporalityDelta = AggregationTemporality(otlpmetrics.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA)
// AggregationTemporalityCumulative is an AggregationTemporality for a metric aggregator which reports changes since a fixed start time.
AggregationTemporalityCumulative = AggregationTemporality(otlpmetrics.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE)
)

// String returns the string representation of the AggregationTemporality.
func (at AggregationTemporality) String() string {
return otlpmetrics.AggregationTemporality(at).String()
}
Loading