diff --git a/consumer/pdata/doc.go b/consumer/pdata/doc.go index 0318c3addd4..dbd02bd0433 100644 --- a/consumer/pdata/doc.go +++ b/consumer/pdata/doc.go @@ -29,4 +29,13 @@ // is non-nil. Several structures also provide New*Slice functions that allow creating // more than one instance of the struct more efficiently instead of calling New* // repeatedly. Use it where appropriate. +// +// This package also provides common ways for decoding serialized bytes into protocol-specific +// in-memory data models (e.g. Zipkin Span). These data models can then be translated to pdata +// representations. Similarly, pdata types can be translated from a data model which can then +// be serialized into bytes. +// +// * Encoding: Common interfaces for serializing/deserializing bytes from/to protocol-specific data models. +// * Translation: Common interfaces for translating protocol-specific data models from/to pdata types. +// * Marshaling: Common higher level APIs that do both encoding and translation of bytes and data model if going directly pdata types to bytes. package pdata diff --git a/internal/model/errors.go b/consumer/pdata/errors.go similarity index 98% rename from internal/model/errors.go rename to consumer/pdata/errors.go index 0ce6051dffc..4ba7eaa8d72 100644 --- a/internal/model/errors.go +++ b/consumer/pdata/errors.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package model +package pdata import ( "fmt" diff --git a/internal/model/errors_test.go b/consumer/pdata/errors_test.go similarity index 98% rename from internal/model/errors_test.go rename to consumer/pdata/errors_test.go index ad13c4538e5..d6a99aebef8 100644 --- a/internal/model/errors_test.go +++ b/consumer/pdata/errors_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package model +package pdata import ( "testing" diff --git a/consumer/pdata/logs.go b/consumer/pdata/logs.go index e18a487f2bb..1cdafdc06f5 100644 --- a/consumer/pdata/logs.go +++ b/consumer/pdata/logs.go @@ -15,12 +15,106 @@ package pdata import ( + "fmt" + "go.opentelemetry.io/collector/internal" otlpcollectorlog "go.opentelemetry.io/collector/internal/data/protogen/collector/logs/v1" otlplogs "go.opentelemetry.io/collector/internal/data/protogen/logs/v1" ) -// This file defines in-memory data structures to represent logs. +// LogsDecoder is an interface to decode bytes into protocol-specific data model. +type LogsDecoder interface { + // DecodeLogs decodes bytes into protocol-specific data model. + // If the error is not nil, the returned interface cannot be used. + DecodeLogs(buf []byte) (interface{}, error) +} + +// LogsEncoder is an interface to encode protocol-specific data model into bytes. +type LogsEncoder interface { + // EncodeLogs encodes protocol-specific data model into bytes. + // If the error is not nil, the returned bytes slice cannot be used. + EncodeLogs(model interface{}) ([]byte, error) +} + +// FromLogsTranslator is an interface to translate pdata.Logs into protocol-specific data model. +type FromLogsTranslator interface { + // FromLogs translates pdata.Logs into protocol-specific data model. + // If the error is not nil, the returned pdata.Logs cannot be used. + FromLogs(ld Logs) (interface{}, error) +} + +// ToLogsTranslator is an interface to translate a protocol-specific data model into pdata.Traces. +type ToLogsTranslator interface { + // ToLogs translates a protocol-specific data model into pdata.Logs. + // If the error is not nil, the returned pdata.Logs cannot be used. + ToLogs(src interface{}) (Logs, error) +} + +// LogsMarshaler marshals pdata.Logs into bytes. +type LogsMarshaler interface { + // Marshal the given pdata.Logs into bytes. + // If the error is not nil, the returned bytes slice cannot be used. + Marshal(td Logs) ([]byte, error) +} + +type logsMarshaler struct { + encoder LogsEncoder + translator FromLogsTranslator +} + +// NewLogsMarshaler returns a new LogsMarshaler. +func NewLogsMarshaler(encoder LogsEncoder, translator FromLogsTranslator) LogsMarshaler { + return &logsMarshaler{ + encoder: encoder, + translator: translator, + } +} + +// Marshal pdata.Logs into bytes. +func (t *logsMarshaler) Marshal(td Logs) ([]byte, error) { + model, err := t.translator.FromLogs(td) + if err != nil { + return nil, fmt.Errorf("converting pdata to model failed: %w", err) + } + buf, err := t.encoder.EncodeLogs(model) + if err != nil { + return nil, fmt.Errorf("marshal failed: %w", err) + } + return buf, nil +} + +// LogsUnmarshaler unmarshalls bytes into pdata.Logs. +type LogsUnmarshaler interface { + // Unmarshal the given bytes into pdata.Logs. + // If the error is not nil, the returned pdata.Logs cannot be used. + Unmarshal(buf []byte) (Logs, error) +} + +type logsUnmarshaler struct { + decoder LogsDecoder + translator ToLogsTranslator +} + +// NewLogsUnmarshaler returns a new LogsUnmarshaler. +func NewLogsUnmarshaler(decoder LogsDecoder, translator ToLogsTranslator) LogsUnmarshaler { + return &logsUnmarshaler{ + decoder: decoder, + translator: translator, + } +} + +// Unmarshal bytes into pdata.Logs. On error pdata.Logs is invalid. +func (t *logsUnmarshaler) Unmarshal(buf []byte) (Logs, error) { + model, err := t.decoder.DecodeLogs(buf) + if err != nil { + return Logs{}, fmt.Errorf("unmarshal failed: %w", err) + } + td, err := t.translator.ToLogs(model) + if err != nil { + return Logs{}, fmt.Errorf("converting model to pdata failed: %w", err) + } + return td, nil +} // Logs is the top-level struct that is propagated through the logs pipeline. // diff --git a/consumer/pdata/logs_test.go b/consumer/pdata/logs_test.go index 4a79fcc26d6..c4a9ee3b7c0 100644 --- a/consumer/pdata/logs_test.go +++ b/consumer/pdata/logs_test.go @@ -15,6 +15,7 @@ package pdata import ( + "errors" "testing" "github.com/stretchr/testify/assert" @@ -25,6 +26,101 @@ import ( otlplogs "go.opentelemetry.io/collector/internal/data/protogen/logs/v1" ) +func TestLogsMarshal_TranslationError(t *testing.T) { + translator := &mockTranslator{} + encoder := &mockEncoder{} + + lm := NewLogsMarshaler(encoder, translator) + ld := NewLogs() + + translator.On("FromLogs", ld).Return(nil, errors.New("translation failed")) + + _, err := lm.Marshal(ld) + assert.Error(t, err) + assert.EqualError(t, err, "converting pdata to model failed: translation failed") +} + +func TestLogsMarshal_SerializeError(t *testing.T) { + translator := &mockTranslator{} + encoder := &mockEncoder{} + + lm := NewLogsMarshaler(encoder, translator) + ld := NewLogs() + expectedModel := struct{}{} + + translator.On("FromLogs", ld).Return(expectedModel, nil) + encoder.On("EncodeLogs", expectedModel).Return(nil, errors.New("serialization failed")) + + _, err := lm.Marshal(ld) + assert.Error(t, err) + assert.EqualError(t, err, "marshal failed: serialization failed") +} + +func TestLogsMarshal_Encode(t *testing.T) { + translator := &mockTranslator{} + encoder := &mockEncoder{} + + lm := NewLogsMarshaler(encoder, translator) + expectedLogs := NewLogs() + expectedBytes := []byte{1, 2, 3} + expectedModel := struct{}{} + + translator.On("FromLogs", expectedLogs).Return(expectedModel, nil) + encoder.On("EncodeLogs", expectedModel).Return(expectedBytes, nil) + + actualBytes, err := lm.Marshal(expectedLogs) + assert.NoError(t, err) + assert.Equal(t, expectedBytes, actualBytes) +} + +func TestLogsUnmarshal_EncodingError(t *testing.T) { + translator := &mockTranslator{} + encoder := &mockEncoder{} + + lu := NewLogsUnmarshaler(encoder, translator) + expectedBytes := []byte{1, 2, 3} + expectedModel := struct{}{} + + encoder.On("DecodeLogs", expectedBytes).Return(expectedModel, errors.New("decode failed")) + + _, err := lu.Unmarshal(expectedBytes) + assert.Error(t, err) + assert.EqualError(t, err, "unmarshal failed: decode failed") +} + +func TestLogsUnmarshal_TranslationError(t *testing.T) { + translator := &mockTranslator{} + encoder := &mockEncoder{} + + lu := NewLogsUnmarshaler(encoder, translator) + expectedBytes := []byte{1, 2, 3} + expectedModel := struct{}{} + + encoder.On("DecodeLogs", expectedBytes).Return(expectedModel, nil) + translator.On("ToLogs", expectedModel).Return(NewLogs(), errors.New("translation failed")) + + _, err := lu.Unmarshal(expectedBytes) + assert.Error(t, err) + assert.EqualError(t, err, "converting model to pdata failed: translation failed") +} + +func TestLogsUnmarshal_Decode(t *testing.T) { + translator := &mockTranslator{} + encoder := &mockEncoder{} + + lu := NewLogsUnmarshaler(encoder, translator) + expectedLogs := NewLogs() + expectedBytes := []byte{1, 2, 3} + expectedModel := struct{}{} + + encoder.On("DecodeLogs", expectedBytes).Return(expectedModel, nil) + translator.On("ToLogs", expectedModel).Return(expectedLogs, nil) + + actualLogs, err := lu.Unmarshal(expectedBytes) + assert.NoError(t, err) + assert.Equal(t, expectedLogs, actualLogs) +} + func TestLogRecordCount(t *testing.T) { md := NewLogs() assert.EqualValues(t, 0, md.LogRecordCount()) diff --git a/consumer/pdata/metrics.go b/consumer/pdata/metrics.go index 9a3bc7475fc..3917ce539f6 100644 --- a/consumer/pdata/metrics.go +++ b/consumer/pdata/metrics.go @@ -15,27 +15,105 @@ package pdata import ( + "fmt" + "go.opentelemetry.io/collector/internal" otlpcollectormetrics "go.opentelemetry.io/collector/internal/data/protogen/collector/metrics/v1" otlpmetrics "go.opentelemetry.io/collector/internal/data/protogen/metrics/v1" ) -// AggregationTemporality defines how a metric aggregator reports aggregated values. -// It describes how those values relate to the time interval over which they are aggregated. -type AggregationTemporality int32 +// MetricsDecoder is an interface to decode bytes into protocol-specific data model. +type MetricsDecoder interface { + // DecodeMetrics decodes bytes into protocol-specific data model. + // If the error is not nil, the returned interface cannot be used. + DecodeMetrics(buf []byte) (interface{}, error) +} -const ( - // AggregationTemporalityUnspecified is the default AggregationTemporality, it MUST NOT be used. - AggregationTemporalityUnspecified = AggregationTemporality(otlpmetrics.AggregationTemporality_AGGREGATION_TEMPORALITY_UNSPECIFIED) - // AggregationTemporalityDelta is an AggregationTemporality for a metric aggregator which reports changes since last report time. - AggregationTemporalityDelta = AggregationTemporality(otlpmetrics.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA) - // AggregationTemporalityCumulative is an AggregationTemporality for a metric aggregator which reports changes since a fixed start time. - AggregationTemporalityCumulative = AggregationTemporality(otlpmetrics.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE) -) +// MetricsEncoder is an interface to encode protocol-specific data model into bytes. +type MetricsEncoder interface { + // EncodeMetrics encodes protocol-specific data model into bytes. + // If the error is not nil, the returned bytes slice cannot be used. + EncodeMetrics(model interface{}) ([]byte, error) +} -// String returns the string representation of the AggregationTemporality. -func (at AggregationTemporality) String() string { - return otlpmetrics.AggregationTemporality(at).String() +// FromMetricsTranslator is an interface to translate pdata.Metrics into protocol-specific data model. +type FromMetricsTranslator interface { + // FromMetrics translates pdata.Metrics into protocol-specific data model. + // If the error is not nil, the returned pdata.Metrics cannot be used. + FromMetrics(md Metrics) (interface{}, error) +} + +// ToMetricsTranslator is an interface to translate a protocol-specific data model into pdata.Traces. +type ToMetricsTranslator interface { + // ToMetrics translates a protocol-specific data model into pdata.Metrics. + // If the error is not nil, the returned pdata.Metrics cannot be used. + ToMetrics(src interface{}) (Metrics, error) +} + +// MetricsMarshaler marshals pdata.Metrics into bytes. +type MetricsMarshaler interface { + // Marshal the given pdata.Metrics into bytes. + // If the error is not nil, the returned bytes slice cannot be used. + Marshal(td Metrics) ([]byte, error) +} + +type metricsMarshaler struct { + encoder MetricsEncoder + translator FromMetricsTranslator +} + +// NewMetricsMarshaler returns a new MetricsMarshaler. +func NewMetricsMarshaler(encoder MetricsEncoder, translator FromMetricsTranslator) MetricsMarshaler { + return &metricsMarshaler{ + encoder: encoder, + translator: translator, + } +} + +// Marshal pdata.Metrics into bytes. +func (t *metricsMarshaler) Marshal(td Metrics) ([]byte, error) { + model, err := t.translator.FromMetrics(td) + if err != nil { + return nil, fmt.Errorf("converting pdata to model failed: %w", err) + } + buf, err := t.encoder.EncodeMetrics(model) + if err != nil { + return nil, fmt.Errorf("marshal failed: %w", err) + } + return buf, nil +} + +// MetricsUnmarshaler unmarshalls bytes into pdata.Metrics. +type MetricsUnmarshaler interface { + // Unmarshal the given bytes into pdata.Metrics. + // If the error is not nil, the returned pdata.Metrics cannot be used. + Unmarshal(buf []byte) (Metrics, error) +} + +type metricsUnmarshaler struct { + decoder MetricsDecoder + translator ToMetricsTranslator +} + +// NewMetricsUnmarshaler returns a new MetricsUnmarshaler. +func NewMetricsUnmarshaler(decoder MetricsDecoder, translator ToMetricsTranslator) MetricsUnmarshaler { + return &metricsUnmarshaler{ + decoder: decoder, + translator: translator, + } +} + +// Unmarshal bytes into pdata.Metrics. On error pdata.Metrics is invalid. +func (t *metricsUnmarshaler) Unmarshal(buf []byte) (Metrics, error) { + model, err := t.decoder.DecodeMetrics(buf) + if err != nil { + return Metrics{}, fmt.Errorf("unmarshal failed: %w", err) + } + td, err := t.translator.ToMetrics(model) + if err != nil { + return Metrics{}, fmt.Errorf("converting model to pdata failed: %w", err) + } + return td, nil } // Metrics is an opaque interface that allows transition to the new internal Metrics data, but also facilitates the @@ -313,3 +391,21 @@ func copyData(src, dest *otlpmetrics.Metric) { dest.Data = data } } + +// AggregationTemporality defines how a metric aggregator reports aggregated values. +// It describes how those values relate to the time interval over which they are aggregated. +type AggregationTemporality int32 + +const ( + // AggregationTemporalityUnspecified is the default AggregationTemporality, it MUST NOT be used. + AggregationTemporalityUnspecified = AggregationTemporality(otlpmetrics.AggregationTemporality_AGGREGATION_TEMPORALITY_UNSPECIFIED) + // AggregationTemporalityDelta is an AggregationTemporality for a metric aggregator which reports changes since last report time. + AggregationTemporalityDelta = AggregationTemporality(otlpmetrics.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA) + // AggregationTemporalityCumulative is an AggregationTemporality for a metric aggregator which reports changes since a fixed start time. + AggregationTemporalityCumulative = AggregationTemporality(otlpmetrics.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE) +) + +// String returns the string representation of the AggregationTemporality. +func (at AggregationTemporality) String() string { + return otlpmetrics.AggregationTemporality(at).String() +} diff --git a/consumer/pdata/metrics_test.go b/consumer/pdata/metrics_test.go index 477b0fbfa97..0188948f04c 100644 --- a/consumer/pdata/metrics_test.go +++ b/consumer/pdata/metrics_test.go @@ -15,6 +15,7 @@ package pdata import ( + "errors" "testing" gogoproto "github.com/gogo/protobuf/proto" @@ -35,6 +36,101 @@ const ( endTime = uint64(12578940000000054321) ) +func TestMetricsMarshal_TranslationError(t *testing.T) { + translator := &mockTranslator{} + encoder := &mockEncoder{} + + mm := NewMetricsMarshaler(encoder, translator) + md := NewMetrics() + + translator.On("FromMetrics", md).Return(nil, errors.New("translation failed")) + + _, err := mm.Marshal(md) + assert.Error(t, err) + assert.EqualError(t, err, "converting pdata to model failed: translation failed") +} + +func TestMetricsMarshal_SerializeError(t *testing.T) { + translator := &mockTranslator{} + encoder := &mockEncoder{} + + mm := NewMetricsMarshaler(encoder, translator) + md := NewMetrics() + expectedModel := struct{}{} + + translator.On("FromMetrics", md).Return(expectedModel, nil) + encoder.On("EncodeMetrics", expectedModel).Return(nil, errors.New("serialization failed")) + + _, err := mm.Marshal(md) + assert.Error(t, err) + assert.EqualError(t, err, "marshal failed: serialization failed") +} + +func TestMetricsMarshal_Encode(t *testing.T) { + translator := &mockTranslator{} + encoder := &mockEncoder{} + + mm := NewMetricsMarshaler(encoder, translator) + expectedMetrics := NewMetrics() + expectedBytes := []byte{1, 2, 3} + expectedModel := struct{}{} + + translator.On("FromMetrics", expectedMetrics).Return(expectedModel, nil) + encoder.On("EncodeMetrics", expectedModel).Return(expectedBytes, nil) + + actualBytes, err := mm.Marshal(expectedMetrics) + assert.NoError(t, err) + assert.Equal(t, expectedBytes, actualBytes) +} + +func TestMetricsUnmarshal_EncodingError(t *testing.T) { + translator := &mockTranslator{} + encoder := &mockEncoder{} + + mu := NewMetricsUnmarshaler(encoder, translator) + expectedBytes := []byte{1, 2, 3} + expectedModel := struct{}{} + + encoder.On("DecodeMetrics", expectedBytes).Return(expectedModel, errors.New("decode failed")) + + _, err := mu.Unmarshal(expectedBytes) + assert.Error(t, err) + assert.EqualError(t, err, "unmarshal failed: decode failed") +} + +func TestMetricsUnmarshal_TranslationError(t *testing.T) { + translator := &mockTranslator{} + encoder := &mockEncoder{} + + mu := NewMetricsUnmarshaler(encoder, translator) + expectedBytes := []byte{1, 2, 3} + expectedModel := struct{}{} + + encoder.On("DecodeMetrics", expectedBytes).Return(expectedModel, nil) + translator.On("ToMetrics", expectedModel).Return(NewMetrics(), errors.New("translation failed")) + + _, err := mu.Unmarshal(expectedBytes) + assert.Error(t, err) + assert.EqualError(t, err, "converting model to pdata failed: translation failed") +} + +func TestMetricsUnmarshal_Decode(t *testing.T) { + translator := &mockTranslator{} + encoder := &mockEncoder{} + + mu := NewMetricsUnmarshaler(encoder, translator) + expectedMetrics := NewMetrics() + expectedBytes := []byte{1, 2, 3} + expectedModel := struct{}{} + + encoder.On("DecodeMetrics", expectedBytes).Return(expectedModel, nil) + translator.On("ToMetrics", expectedModel).Return(expectedMetrics, nil) + + actualMetrics, err := mu.Unmarshal(expectedBytes) + assert.NoError(t, err) + assert.Equal(t, expectedMetrics, actualMetrics) +} + func TestCopyData(t *testing.T) { tests := []struct { name string diff --git a/internal/model/mocks_test.go b/consumer/pdata/mocks_test.go similarity index 77% rename from internal/model/mocks_test.go rename to consumer/pdata/mocks_test.go index c5b86e8f378..239e0848c16 100644 --- a/internal/model/mocks_test.go +++ b/consumer/pdata/mocks_test.go @@ -12,12 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -package model +package pdata import ( "github.com/stretchr/testify/mock" - - "go.opentelemetry.io/collector/consumer/pdata" ) var ( @@ -80,32 +78,32 @@ type mockTranslator struct { mock.Mock } -func (m *mockTranslator) ToTraces(src interface{}) (pdata.Traces, error) { +func (m *mockTranslator) ToTraces(src interface{}) (Traces, error) { args := m.Called(src) - return args.Get(0).(pdata.Traces), args.Error(1) + return args.Get(0).(Traces), args.Error(1) } -func (m *mockTranslator) FromTraces(md pdata.Traces) (interface{}, error) { +func (m *mockTranslator) FromTraces(md Traces) (interface{}, error) { args := m.Called(md) return args.Get(0), args.Error(1) } -func (m *mockTranslator) ToMetrics(src interface{}) (pdata.Metrics, error) { +func (m *mockTranslator) ToMetrics(src interface{}) (Metrics, error) { args := m.Called(src) - return args.Get(0).(pdata.Metrics), args.Error(1) + return args.Get(0).(Metrics), args.Error(1) } -func (m *mockTranslator) FromMetrics(md pdata.Metrics) (interface{}, error) { +func (m *mockTranslator) FromMetrics(md Metrics) (interface{}, error) { args := m.Called(md) return args.Get(0), args.Error(1) } -func (m *mockTranslator) ToLogs(src interface{}) (pdata.Logs, error) { +func (m *mockTranslator) ToLogs(src interface{}) (Logs, error) { args := m.Called(src) - return args.Get(0).(pdata.Logs), args.Error(1) + return args.Get(0).(Logs), args.Error(1) } -func (m *mockTranslator) FromLogs(md pdata.Logs) (interface{}, error) { +func (m *mockTranslator) FromLogs(md Logs) (interface{}, error) { args := m.Called(md) return args.Get(0), args.Error(1) } diff --git a/consumer/pdata/traces.go b/consumer/pdata/traces.go index 02e97d68ebe..bff03a1bb91 100644 --- a/consumer/pdata/traces.go +++ b/consumer/pdata/traces.go @@ -15,12 +15,106 @@ package pdata import ( + "fmt" + "go.opentelemetry.io/collector/internal" otlpcollectortrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1" otlptrace "go.opentelemetry.io/collector/internal/data/protogen/trace/v1" ) -// This file defines in-memory data structures to represent traces (spans). +// TracesDecoder is an interface to decode bytes into protocol-specific data model. +type TracesDecoder interface { + // DecodeTraces decodes bytes into protocol-specific data model. + // If the error is not nil, the returned interface cannot be used. + DecodeTraces(buf []byte) (interface{}, error) +} + +// TracesEncoder is an interface to encode protocol-specific data model into bytes. +type TracesEncoder interface { + // EncodeTraces encodes protocol-specific data model into bytes. + // If the error is not nil, the returned bytes slice cannot be used. + EncodeTraces(model interface{}) ([]byte, error) +} + +// FromTracesTranslator is an interface to translate pdata.Traces into protocol-specific data model. +type FromTracesTranslator interface { + // FromTraces translates pdata.Traces into protocol-specific data model. + // If the error is not nil, the returned pdata.Traces cannot be used. + FromTraces(td Traces) (interface{}, error) +} + +// ToTracesTranslator is an interface to translate a protocol-specific data model into pdata.Traces. +type ToTracesTranslator interface { + // ToTraces translates a protocol-specific data model into pdata.Traces. + // If the error is not nil, the returned pdata.Traces cannot be used. + ToTraces(src interface{}) (Traces, error) +} + +// TracesMarshaler marshals pdata.Traces into bytes. +type TracesMarshaler interface { + // Marshal the given pdata.Traces into bytes. + // If the error is not nil, the returned bytes slice cannot be used. + Marshal(td Traces) ([]byte, error) +} + +type tracesMarshaler struct { + encoder TracesEncoder + translator FromTracesTranslator +} + +// NewTracesMarshaler returns a new TracesMarshaler. +func NewTracesMarshaler(encoder TracesEncoder, translator FromTracesTranslator) TracesMarshaler { + return &tracesMarshaler{ + encoder: encoder, + translator: translator, + } +} + +// Marshal pdata.Traces into bytes. +func (t *tracesMarshaler) Marshal(td Traces) ([]byte, error) { + model, err := t.translator.FromTraces(td) + if err != nil { + return nil, fmt.Errorf("converting pdata to model failed: %w", err) + } + buf, err := t.encoder.EncodeTraces(model) + if err != nil { + return nil, fmt.Errorf("marshal failed: %w", err) + } + return buf, nil +} + +// TracesUnmarshaler unmarshalls bytes into pdata.Traces. +type TracesUnmarshaler interface { + // Unmarshal the given bytes into pdata.Traces. + // If the error is not nil, the returned pdata.Traces cannot be used. + Unmarshal(buf []byte) (Traces, error) +} + +type tracesUnmarshaler struct { + decoder TracesDecoder + translator ToTracesTranslator +} + +// NewTracesUnmarshaler returns a new TracesUnmarshaler. +func NewTracesUnmarshaler(decoder TracesDecoder, translator ToTracesTranslator) TracesUnmarshaler { + return &tracesUnmarshaler{ + decoder: decoder, + translator: translator, + } +} + +// Unmarshal bytes into pdata.Traces. On error pdata.Traces is invalid. +func (t *tracesUnmarshaler) Unmarshal(buf []byte) (Traces, error) { + model, err := t.decoder.DecodeTraces(buf) + if err != nil { + return Traces{}, fmt.Errorf("unmarshal failed: %w", err) + } + td, err := t.translator.ToTraces(model) + if err != nil { + return Traces{}, fmt.Errorf("converting model to pdata failed: %w", err) + } + return td, nil +} // Traces is the top-level struct that is propagated through the traces pipeline. type Traces struct { diff --git a/consumer/pdata/traces_test.go b/consumer/pdata/traces_test.go index 07edb15e2fc..09ae3318c10 100644 --- a/consumer/pdata/traces_test.go +++ b/consumer/pdata/traces_test.go @@ -15,6 +15,7 @@ package pdata import ( + "errors" "testing" gogoproto "github.com/gogo/protobuf/proto" @@ -28,6 +29,101 @@ import ( otlptrace "go.opentelemetry.io/collector/internal/data/protogen/trace/v1" ) +func TestTracesMarshal_TranslationError(t *testing.T) { + translator := &mockTranslator{} + encoder := &mockEncoder{} + + tm := NewTracesMarshaler(encoder, translator) + td := NewTraces() + + translator.On("FromTraces", td).Return(nil, errors.New("translation failed")) + + _, err := tm.Marshal(td) + assert.Error(t, err) + assert.EqualError(t, err, "converting pdata to model failed: translation failed") +} + +func TestTracesMarshal_SerializeError(t *testing.T) { + translator := &mockTranslator{} + encoder := &mockEncoder{} + + tm := NewTracesMarshaler(encoder, translator) + td := NewTraces() + expectedModel := struct{}{} + + translator.On("FromTraces", td).Return(expectedModel, nil) + encoder.On("EncodeTraces", expectedModel).Return(nil, errors.New("serialization failed")) + + _, err := tm.Marshal(td) + assert.Error(t, err) + assert.EqualError(t, err, "marshal failed: serialization failed") +} + +func TestTracesMarshal_Encode(t *testing.T) { + translator := &mockTranslator{} + encoder := &mockEncoder{} + + tm := NewTracesMarshaler(encoder, translator) + expectedTraces := NewTraces() + expectedBytes := []byte{1, 2, 3} + expectedModel := struct{}{} + + translator.On("FromTraces", expectedTraces).Return(expectedModel, nil) + encoder.On("EncodeTraces", expectedModel).Return(expectedBytes, nil) + + actualBytes, err := tm.Marshal(expectedTraces) + assert.NoError(t, err) + assert.Equal(t, expectedBytes, actualBytes) +} + +func TestTracesUnmarshal_EncodingError(t *testing.T) { + translator := &mockTranslator{} + encoder := &mockEncoder{} + + tu := NewTracesUnmarshaler(encoder, translator) + expectedBytes := []byte{1, 2, 3} + expectedModel := struct{}{} + + encoder.On("DecodeTraces", expectedBytes).Return(expectedModel, errors.New("decode failed")) + + _, err := tu.Unmarshal(expectedBytes) + assert.Error(t, err) + assert.EqualError(t, err, "unmarshal failed: decode failed") +} + +func TestTracesUnmarshal_TranslationError(t *testing.T) { + translator := &mockTranslator{} + encoder := &mockEncoder{} + + tu := NewTracesUnmarshaler(encoder, translator) + expectedBytes := []byte{1, 2, 3} + expectedModel := struct{}{} + + encoder.On("DecodeTraces", expectedBytes).Return(expectedModel, nil) + translator.On("ToTraces", expectedModel).Return(NewTraces(), errors.New("translation failed")) + + _, err := tu.Unmarshal(expectedBytes) + assert.Error(t, err) + assert.EqualError(t, err, "converting model to pdata failed: translation failed") +} + +func TestTracesUnmarshal_Decode(t *testing.T) { + translator := &mockTranslator{} + encoder := &mockEncoder{} + + tu := NewTracesUnmarshaler(encoder, translator) + expectedTraces := NewTraces() + expectedBytes := []byte{1, 2, 3} + expectedModel := struct{}{} + + encoder.On("DecodeTraces", expectedBytes).Return(expectedModel, nil) + translator.On("ToTraces", expectedModel).Return(expectedTraces, nil) + + actualTraces, err := tu.Unmarshal(expectedBytes) + assert.NoError(t, err) + assert.Equal(t, expectedTraces, actualTraces) +} + func TestSpanCount(t *testing.T) { md := NewTraces() assert.EqualValues(t, 0, md.SpanCount()) diff --git a/exporter/kafkaexporter/pdata_marshaler.go b/exporter/kafkaexporter/pdata_marshaler.go index 3b48eb18494..eaab902dbac 100644 --- a/exporter/kafkaexporter/pdata_marshaler.go +++ b/exporter/kafkaexporter/pdata_marshaler.go @@ -18,11 +18,10 @@ import ( "github.com/Shopify/sarama" "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/internal/model" ) type pdataLogsMarshaler struct { - marshaler model.LogsMarshaler + marshaler pdata.LogsMarshaler encoding string } @@ -43,7 +42,7 @@ func (p pdataLogsMarshaler) Encoding() string { return p.encoding } -func newPdataLogsMarshaler(marshaler model.LogsMarshaler, encoding string) LogsMarshaler { +func newPdataLogsMarshaler(marshaler pdata.LogsMarshaler, encoding string) LogsMarshaler { return pdataLogsMarshaler{ marshaler: marshaler, encoding: encoding, @@ -51,7 +50,7 @@ func newPdataLogsMarshaler(marshaler model.LogsMarshaler, encoding string) LogsM } type pdataMetricsMarshaler struct { - marshaler model.MetricsMarshaler + marshaler pdata.MetricsMarshaler encoding string } @@ -72,7 +71,7 @@ func (p pdataMetricsMarshaler) Encoding() string { return p.encoding } -func newPdataMetricsMarshaler(marshaler model.MetricsMarshaler, encoding string) MetricsMarshaler { +func newPdataMetricsMarshaler(marshaler pdata.MetricsMarshaler, encoding string) MetricsMarshaler { return pdataMetricsMarshaler{ marshaler: marshaler, encoding: encoding, @@ -80,7 +79,7 @@ func newPdataMetricsMarshaler(marshaler model.MetricsMarshaler, encoding string) } type pdataTracesMarshaler struct { - marshaler model.TracesMarshaler + marshaler pdata.TracesMarshaler encoding string } @@ -101,7 +100,7 @@ func (p pdataTracesMarshaler) Encoding() string { return p.encoding } -func newPdataTracesMarshaler(marshaler model.TracesMarshaler, encoding string) TracesMarshaler { +func newPdataTracesMarshaler(marshaler pdata.TracesMarshaler, encoding string) TracesMarshaler { return pdataTracesMarshaler{ marshaler: marshaler, encoding: encoding, diff --git a/exporter/loggingexporter/logging_exporter.go b/exporter/loggingexporter/logging_exporter.go index 9dda804fb8d..9249d44a065 100644 --- a/exporter/loggingexporter/logging_exporter.go +++ b/exporter/loggingexporter/logging_exporter.go @@ -26,16 +26,15 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/exporter/exporterhelper" - "go.opentelemetry.io/collector/internal/model" "go.opentelemetry.io/collector/internal/otlptext" ) type loggingExporter struct { logger *zap.Logger debug bool - logsMarshaler model.LogsMarshaler - metricsMarshaler model.MetricsMarshaler - tracesMarshaler model.TracesMarshaler + logsMarshaler pdata.LogsMarshaler + metricsMarshaler pdata.MetricsMarshaler + tracesMarshaler pdata.TracesMarshaler } func (s *loggingExporter) pushTraces(_ context.Context, td pdata.Traces) error { diff --git a/internal/model/README.md b/internal/model/README.md deleted file mode 100644 index 932544202c8..00000000000 --- a/internal/model/README.md +++ /dev/null @@ -1,9 +0,0 @@ -# Protocols - -This package provides common ways for decoding serialized bytes into protocol-specific in-memory data models (e.g. Zipkin Span). These data models can then be translated to internal pdata representations. Similarly, pdata can be translated from a data model which can then be serialized into bytes. - -**Encoding**: Common interfaces for serializing/deserializing bytes from/to protocol-specific data models. - -**Translation**: Common interfaces for translating protocol-specific data models from/to pdata. - -**Marshaling**: Common higher level APIs that do both encoding and translation of bytes and data model if going directly pdata ⇔ bytes. diff --git a/internal/model/decoder.go b/internal/model/decoder.go deleted file mode 100644 index aa65b3b80a6..00000000000 --- a/internal/model/decoder.go +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package model - -// MetricsDecoder is an interface to decode bytes into protocol-specific data model. -type MetricsDecoder interface { - // DecodeMetrics decodes bytes into protocol-specific data model. - // If the error is not nil, the returned interface cannot be used. - DecodeMetrics(buf []byte) (interface{}, error) -} - -// TracesDecoder is an interface to decode bytes into protocol-specific data model. -type TracesDecoder interface { - // DecodeTraces decodes bytes into protocol-specific data model. - // If the error is not nil, the returned interface cannot be used. - DecodeTraces(buf []byte) (interface{}, error) -} - -// LogsDecoder is an interface to decode bytes into protocol-specific data model. -type LogsDecoder interface { - // DecodeLogs decodes bytes into protocol-specific data model. - // If the error is not nil, the returned interface cannot be used. - DecodeLogs(buf []byte) (interface{}, error) -} diff --git a/internal/model/encoder.go b/internal/model/encoder.go deleted file mode 100644 index 37eec1236fb..00000000000 --- a/internal/model/encoder.go +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package model - -// MetricsEncoder is an interface to encode protocol-specific data model into bytes. -type MetricsEncoder interface { - // EncodeMetrics encodes protocol-specific data model into bytes. - // If the error is not nil, the returned bytes slice cannot be used. - EncodeMetrics(model interface{}) ([]byte, error) -} - -// TracesEncoder is an interface to encode protocol-specific data model into bytes. -type TracesEncoder interface { - // EncodeTraces encodes protocol-specific data model into bytes. - // If the error is not nil, the returned bytes slice cannot be used. - EncodeTraces(model interface{}) ([]byte, error) -} - -// LogsEncoder is an interface to encode protocol-specific data model into bytes. -type LogsEncoder interface { - // EncodeLogs encodes protocol-specific data model into bytes. - // If the error is not nil, the returned bytes slice cannot be used. - EncodeLogs(model interface{}) ([]byte, error) -} diff --git a/internal/model/from_translator.go b/internal/model/from_translator.go deleted file mode 100644 index f890cd81bec..00000000000 --- a/internal/model/from_translator.go +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package model - -import "go.opentelemetry.io/collector/consumer/pdata" - -// FromMetricsTranslator is an interface to translate pdata.Metrics into protocol-specific data model. -type FromMetricsTranslator interface { - // FromMetrics translates pdata.Metrics into protocol-specific data model. - // If the error is not nil, the returned pdata.Metrics cannot be used. - FromMetrics(md pdata.Metrics) (interface{}, error) -} - -// FromTracesTranslator is an interface to translate pdata.Traces into protocol-specific data model. -type FromTracesTranslator interface { - // FromTraces translates pdata.Traces into protocol-specific data model. - // If the error is not nil, the returned pdata.Traces cannot be used. - FromTraces(td pdata.Traces) (interface{}, error) -} - -// FromLogsTranslator is an interface to translate pdata.Logs into protocol-specific data model. -type FromLogsTranslator interface { - // FromLogs translates pdata.Logs into protocol-specific data model. - // If the error is not nil, the returned pdata.Logs cannot be used. - FromLogs(ld pdata.Logs) (interface{}, error) -} diff --git a/internal/model/marshaler.go b/internal/model/marshaler.go deleted file mode 100644 index 15892234bcc..00000000000 --- a/internal/model/marshaler.go +++ /dev/null @@ -1,120 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package model - -import ( - "fmt" - - "go.opentelemetry.io/collector/consumer/pdata" -) - -// TracesMarshaler marshals pdata.Traces into bytes. -type TracesMarshaler interface { - // Marshal the given pdata.Traces into bytes. - // If the error is not nil, the returned bytes slice cannot be used. - Marshal(td pdata.Traces) ([]byte, error) -} - -type tracesMarshaler struct { - encoder TracesEncoder - translator FromTracesTranslator -} - -// NewTracesMarshaler returns a new TracesMarshaler. -func NewTracesMarshaler(encoder TracesEncoder, translator FromTracesTranslator) TracesMarshaler { - return &tracesMarshaler{ - encoder: encoder, - translator: translator, - } -} - -// Marshal pdata.Traces into bytes. -func (t *tracesMarshaler) Marshal(td pdata.Traces) ([]byte, error) { - model, err := t.translator.FromTraces(td) - if err != nil { - return nil, fmt.Errorf("converting pdata to model failed: %w", err) - } - buf, err := t.encoder.EncodeTraces(model) - if err != nil { - return nil, fmt.Errorf("marshal failed: %w", err) - } - return buf, nil -} - -// MetricsMarshaler marshals pdata.Metrics into bytes. -type MetricsMarshaler interface { - // Marshal the given pdata.Metrics into bytes. - // If the error is not nil, the returned bytes slice cannot be used. - Marshal(td pdata.Metrics) ([]byte, error) -} - -type metricsMarshaler struct { - encoder MetricsEncoder - translator FromMetricsTranslator -} - -// NewMetricsMarshaler returns a new MetricsMarshaler. -func NewMetricsMarshaler(encoder MetricsEncoder, translator FromMetricsTranslator) MetricsMarshaler { - return &metricsMarshaler{ - encoder: encoder, - translator: translator, - } -} - -// Marshal pdata.Metrics into bytes. -func (t *metricsMarshaler) Marshal(td pdata.Metrics) ([]byte, error) { - model, err := t.translator.FromMetrics(td) - if err != nil { - return nil, fmt.Errorf("converting pdata to model failed: %w", err) - } - buf, err := t.encoder.EncodeMetrics(model) - if err != nil { - return nil, fmt.Errorf("marshal failed: %w", err) - } - return buf, nil -} - -// LogsMarshaler marshals pdata.Logs into bytes. -type LogsMarshaler interface { - // Marshal the given pdata.Logs into bytes. - // If the error is not nil, the returned bytes slice cannot be used. - Marshal(td pdata.Logs) ([]byte, error) -} - -type logsMarshaler struct { - encoder LogsEncoder - translator FromLogsTranslator -} - -// NewLogsMarshaler returns a new LogsMarshaler. -func NewLogsMarshaler(encoder LogsEncoder, translator FromLogsTranslator) LogsMarshaler { - return &logsMarshaler{ - encoder: encoder, - translator: translator, - } -} - -// Marshal pdata.Logs into bytes. -func (t *logsMarshaler) Marshal(td pdata.Logs) ([]byte, error) { - model, err := t.translator.FromLogs(td) - if err != nil { - return nil, fmt.Errorf("converting pdata to model failed: %w", err) - } - buf, err := t.encoder.EncodeLogs(model) - if err != nil { - return nil, fmt.Errorf("marshal failed: %w", err) - } - return buf, nil -} diff --git a/internal/model/marshaler_test.go b/internal/model/marshaler_test.go deleted file mode 100644 index 8771eb9efc2..00000000000 --- a/internal/model/marshaler_test.go +++ /dev/null @@ -1,165 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package model - -import ( - "errors" - "testing" - - "github.com/stretchr/testify/assert" - - "go.opentelemetry.io/collector/consumer/pdata" -) - -func TestTracesMarshal_TranslationError(t *testing.T) { - translator := &mockTranslator{} - encoder := &mockEncoder{} - - tm := NewTracesMarshaler(encoder, translator) - td := pdata.NewTraces() - - translator.On("FromTraces", td).Return(nil, errors.New("translation failed")) - - _, err := tm.Marshal(td) - assert.Error(t, err) - assert.EqualError(t, err, "converting pdata to model failed: translation failed") -} - -func TestTracesMarshal_SerializeError(t *testing.T) { - translator := &mockTranslator{} - encoder := &mockEncoder{} - - tm := NewTracesMarshaler(encoder, translator) - td := pdata.NewTraces() - expectedModel := struct{}{} - - translator.On("FromTraces", td).Return(expectedModel, nil) - encoder.On("EncodeTraces", expectedModel).Return(nil, errors.New("serialization failed")) - - _, err := tm.Marshal(td) - assert.Error(t, err) - assert.EqualError(t, err, "marshal failed: serialization failed") -} - -func TestTracesMarshal_Encode(t *testing.T) { - translator := &mockTranslator{} - encoder := &mockEncoder{} - - tm := NewTracesMarshaler(encoder, translator) - expectedTraces := pdata.NewTraces() - expectedBytes := []byte{1, 2, 3} - expectedModel := struct{}{} - - translator.On("FromTraces", expectedTraces).Return(expectedModel, nil) - encoder.On("EncodeTraces", expectedModel).Return(expectedBytes, nil) - - actualBytes, err := tm.Marshal(expectedTraces) - assert.NoError(t, err) - assert.Equal(t, expectedBytes, actualBytes) -} - -func TestMetricsMarshal_TranslationError(t *testing.T) { - translator := &mockTranslator{} - encoder := &mockEncoder{} - - mm := NewMetricsMarshaler(encoder, translator) - md := pdata.NewMetrics() - - translator.On("FromMetrics", md).Return(nil, errors.New("translation failed")) - - _, err := mm.Marshal(md) - assert.Error(t, err) - assert.EqualError(t, err, "converting pdata to model failed: translation failed") -} - -func TestMetricsMarshal_SerializeError(t *testing.T) { - translator := &mockTranslator{} - encoder := &mockEncoder{} - - mm := NewMetricsMarshaler(encoder, translator) - md := pdata.NewMetrics() - expectedModel := struct{}{} - - translator.On("FromMetrics", md).Return(expectedModel, nil) - encoder.On("EncodeMetrics", expectedModel).Return(nil, errors.New("serialization failed")) - - _, err := mm.Marshal(md) - assert.Error(t, err) - assert.EqualError(t, err, "marshal failed: serialization failed") -} - -func TestMetricsMarshal_Encode(t *testing.T) { - translator := &mockTranslator{} - encoder := &mockEncoder{} - - mm := NewMetricsMarshaler(encoder, translator) - expectedMetrics := pdata.NewMetrics() - expectedBytes := []byte{1, 2, 3} - expectedModel := struct{}{} - - translator.On("FromMetrics", expectedMetrics).Return(expectedModel, nil) - encoder.On("EncodeMetrics", expectedModel).Return(expectedBytes, nil) - - actualBytes, err := mm.Marshal(expectedMetrics) - assert.NoError(t, err) - assert.Equal(t, expectedBytes, actualBytes) -} - -func TestLogsMarshal_TranslationError(t *testing.T) { - translator := &mockTranslator{} - encoder := &mockEncoder{} - - lm := NewLogsMarshaler(encoder, translator) - ld := pdata.NewLogs() - - translator.On("FromLogs", ld).Return(nil, errors.New("translation failed")) - - _, err := lm.Marshal(ld) - assert.Error(t, err) - assert.EqualError(t, err, "converting pdata to model failed: translation failed") -} - -func TestLogsMarshal_SerializeError(t *testing.T) { - translator := &mockTranslator{} - encoder := &mockEncoder{} - - lm := NewLogsMarshaler(encoder, translator) - ld := pdata.NewLogs() - expectedModel := struct{}{} - - translator.On("FromLogs", ld).Return(expectedModel, nil) - encoder.On("EncodeLogs", expectedModel).Return(nil, errors.New("serialization failed")) - - _, err := lm.Marshal(ld) - assert.Error(t, err) - assert.EqualError(t, err, "marshal failed: serialization failed") -} - -func TestLogsMarshal_Encode(t *testing.T) { - translator := &mockTranslator{} - encoder := &mockEncoder{} - - lm := NewLogsMarshaler(encoder, translator) - expectedLogs := pdata.NewLogs() - expectedBytes := []byte{1, 2, 3} - expectedModel := struct{}{} - - translator.On("FromLogs", expectedLogs).Return(expectedModel, nil) - encoder.On("EncodeLogs", expectedModel).Return(expectedBytes, nil) - - actualBytes, err := lm.Marshal(expectedLogs) - assert.NoError(t, err) - assert.Equal(t, expectedBytes, actualBytes) -} diff --git a/internal/model/to_translator.go b/internal/model/to_translator.go deleted file mode 100644 index 9bbb7aceccb..00000000000 --- a/internal/model/to_translator.go +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package model - -import "go.opentelemetry.io/collector/consumer/pdata" - -// ToMetricsTranslator is an interface to translate a protocol-specific data model into pdata.Traces. -type ToMetricsTranslator interface { - // ToMetrics translates a protocol-specific data model into pdata.Metrics. - // If the error is not nil, the returned pdata.Metrics cannot be used. - ToMetrics(src interface{}) (pdata.Metrics, error) -} - -// ToTracesTranslator is an interface to translate a protocol-specific data model into pdata.Traces. -type ToTracesTranslator interface { - // ToTraces translates a protocol-specific data model into pdata.Traces. - // If the error is not nil, the returned pdata.Traces cannot be used. - ToTraces(src interface{}) (pdata.Traces, error) -} - -// ToLogsTranslator is an interface to translate a protocol-specific data model into pdata.Traces. -type ToLogsTranslator interface { - // ToLogs translates a protocol-specific data model into pdata.Logs. - // If the error is not nil, the returned pdata.Logs cannot be used. - ToLogs(src interface{}) (pdata.Logs, error) -} diff --git a/internal/model/unmarshal.go b/internal/model/unmarshal.go deleted file mode 100644 index 1194f58998e..00000000000 --- a/internal/model/unmarshal.go +++ /dev/null @@ -1,120 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package model - -import ( - "fmt" - - "go.opentelemetry.io/collector/consumer/pdata" -) - -// TracesUnmarshaler unmarshalls bytes into pdata.Traces. -type TracesUnmarshaler interface { - // Unmarshal the given bytes into pdata.Traces. - // If the error is not nil, the returned pdata.Traces cannot be used. - Unmarshal(buf []byte) (pdata.Traces, error) -} - -type tracesUnmarshaler struct { - decoder TracesDecoder - translator ToTracesTranslator -} - -// NewTracesUnmarshaler returns a new TracesUnmarshaler. -func NewTracesUnmarshaler(decoder TracesDecoder, translator ToTracesTranslator) TracesUnmarshaler { - return &tracesUnmarshaler{ - decoder: decoder, - translator: translator, - } -} - -// Unmarshal bytes into pdata.Traces. On error pdata.Traces is invalid. -func (t *tracesUnmarshaler) Unmarshal(buf []byte) (pdata.Traces, error) { - model, err := t.decoder.DecodeTraces(buf) - if err != nil { - return pdata.Traces{}, fmt.Errorf("unmarshal failed: %w", err) - } - td, err := t.translator.ToTraces(model) - if err != nil { - return pdata.Traces{}, fmt.Errorf("converting model to pdata failed: %w", err) - } - return td, nil -} - -// MetricsUnmarshaler unmarshalls bytes into pdata.Metrics. -type MetricsUnmarshaler interface { - // Unmarshal the given bytes into pdata.Metrics. - // If the error is not nil, the returned pdata.Metrics cannot be used. - Unmarshal(buf []byte) (pdata.Metrics, error) -} - -type metricsUnmarshaler struct { - decoder MetricsDecoder - translator ToMetricsTranslator -} - -// NewMetricsUnmarshaler returns a new MetricsUnmarshaler. -func NewMetricsUnmarshaler(decoder MetricsDecoder, translator ToMetricsTranslator) MetricsUnmarshaler { - return &metricsUnmarshaler{ - decoder: decoder, - translator: translator, - } -} - -// Unmarshal bytes into pdata.Metrics. On error pdata.Metrics is invalid. -func (t *metricsUnmarshaler) Unmarshal(buf []byte) (pdata.Metrics, error) { - model, err := t.decoder.DecodeMetrics(buf) - if err != nil { - return pdata.Metrics{}, fmt.Errorf("unmarshal failed: %w", err) - } - td, err := t.translator.ToMetrics(model) - if err != nil { - return pdata.Metrics{}, fmt.Errorf("converting model to pdata failed: %w", err) - } - return td, nil -} - -// LogsUnmarshaler unmarshalls bytes into pdata.Logs. -type LogsUnmarshaler interface { - // Unmarshal the given bytes into pdata.Logs. - // If the error is not nil, the returned pdata.Logs cannot be used. - Unmarshal(buf []byte) (pdata.Logs, error) -} - -type logsUnmarshaler struct { - decoder LogsDecoder - translator ToLogsTranslator -} - -// NewLogsUnmarshaler returns a new LogsUnmarshaler. -func NewLogsUnmarshaler(decoder LogsDecoder, translator ToLogsTranslator) LogsUnmarshaler { - return &logsUnmarshaler{ - decoder: decoder, - translator: translator, - } -} - -// Unmarshal bytes into pdata.Logs. On error pdata.Logs is invalid. -func (t *logsUnmarshaler) Unmarshal(buf []byte) (pdata.Logs, error) { - model, err := t.decoder.DecodeLogs(buf) - if err != nil { - return pdata.Logs{}, fmt.Errorf("unmarshal failed: %w", err) - } - td, err := t.translator.ToLogs(model) - if err != nil { - return pdata.Logs{}, fmt.Errorf("converting model to pdata failed: %w", err) - } - return td, nil -} diff --git a/internal/model/unmarshal_test.go b/internal/model/unmarshal_test.go deleted file mode 100644 index 4f864994aae..00000000000 --- a/internal/model/unmarshal_test.go +++ /dev/null @@ -1,168 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package model - -import ( - "errors" - "testing" - - "github.com/stretchr/testify/assert" - - "go.opentelemetry.io/collector/consumer/pdata" -) - -func TestTracesUnmarshal_EncodingError(t *testing.T) { - translator := &mockTranslator{} - encoder := &mockEncoder{} - - tu := NewTracesUnmarshaler(encoder, translator) - expectedBytes := []byte{1, 2, 3} - expectedModel := struct{}{} - - encoder.On("DecodeTraces", expectedBytes).Return(expectedModel, errors.New("decode failed")) - - _, err := tu.Unmarshal(expectedBytes) - assert.Error(t, err) - assert.EqualError(t, err, "unmarshal failed: decode failed") -} - -func TestTracesUnmarshal_TranslationError(t *testing.T) { - translator := &mockTranslator{} - encoder := &mockEncoder{} - - tu := NewTracesUnmarshaler(encoder, translator) - expectedBytes := []byte{1, 2, 3} - expectedModel := struct{}{} - - encoder.On("DecodeTraces", expectedBytes).Return(expectedModel, nil) - translator.On("ToTraces", expectedModel).Return(pdata.NewTraces(), errors.New("translation failed")) - - _, err := tu.Unmarshal(expectedBytes) - assert.Error(t, err) - assert.EqualError(t, err, "converting model to pdata failed: translation failed") -} - -func TestTracesUnmarshal_Decode(t *testing.T) { - translator := &mockTranslator{} - encoder := &mockEncoder{} - - tu := NewTracesUnmarshaler(encoder, translator) - expectedTraces := pdata.NewTraces() - expectedBytes := []byte{1, 2, 3} - expectedModel := struct{}{} - - encoder.On("DecodeTraces", expectedBytes).Return(expectedModel, nil) - translator.On("ToTraces", expectedModel).Return(expectedTraces, nil) - - actualTraces, err := tu.Unmarshal(expectedBytes) - assert.NoError(t, err) - assert.Equal(t, expectedTraces, actualTraces) -} - -func TestMetricsUnmarshal_EncodingError(t *testing.T) { - translator := &mockTranslator{} - encoder := &mockEncoder{} - - mu := NewMetricsUnmarshaler(encoder, translator) - expectedBytes := []byte{1, 2, 3} - expectedModel := struct{}{} - - encoder.On("DecodeMetrics", expectedBytes).Return(expectedModel, errors.New("decode failed")) - - _, err := mu.Unmarshal(expectedBytes) - assert.Error(t, err) - assert.EqualError(t, err, "unmarshal failed: decode failed") -} - -func TestMetricsUnmarshal_TranslationError(t *testing.T) { - translator := &mockTranslator{} - encoder := &mockEncoder{} - - mu := NewMetricsUnmarshaler(encoder, translator) - expectedBytes := []byte{1, 2, 3} - expectedModel := struct{}{} - - encoder.On("DecodeMetrics", expectedBytes).Return(expectedModel, nil) - translator.On("ToMetrics", expectedModel).Return(pdata.NewMetrics(), errors.New("translation failed")) - - _, err := mu.Unmarshal(expectedBytes) - assert.Error(t, err) - assert.EqualError(t, err, "converting model to pdata failed: translation failed") -} - -func TestMetricsUnmarshal_Decode(t *testing.T) { - translator := &mockTranslator{} - encoder := &mockEncoder{} - - mu := NewMetricsUnmarshaler(encoder, translator) - expectedMetrics := pdata.NewMetrics() - expectedBytes := []byte{1, 2, 3} - expectedModel := struct{}{} - - encoder.On("DecodeMetrics", expectedBytes).Return(expectedModel, nil) - translator.On("ToMetrics", expectedModel).Return(expectedMetrics, nil) - - actualMetrics, err := mu.Unmarshal(expectedBytes) - assert.NoError(t, err) - assert.Equal(t, expectedMetrics, actualMetrics) -} - -func TestLogsUnmarshal_EncodingError(t *testing.T) { - translator := &mockTranslator{} - encoder := &mockEncoder{} - - lu := NewLogsUnmarshaler(encoder, translator) - expectedBytes := []byte{1, 2, 3} - expectedModel := struct{}{} - - encoder.On("DecodeLogs", expectedBytes).Return(expectedModel, errors.New("decode failed")) - - _, err := lu.Unmarshal(expectedBytes) - assert.Error(t, err) - assert.EqualError(t, err, "unmarshal failed: decode failed") -} - -func TestLogsUnmarshal_TranslationError(t *testing.T) { - translator := &mockTranslator{} - encoder := &mockEncoder{} - - lu := NewLogsUnmarshaler(encoder, translator) - expectedBytes := []byte{1, 2, 3} - expectedModel := struct{}{} - - encoder.On("DecodeLogs", expectedBytes).Return(expectedModel, nil) - translator.On("ToLogs", expectedModel).Return(pdata.NewLogs(), errors.New("translation failed")) - - _, err := lu.Unmarshal(expectedBytes) - assert.Error(t, err) - assert.EqualError(t, err, "converting model to pdata failed: translation failed") -} - -func TestLogsUnmarshal_Decode(t *testing.T) { - translator := &mockTranslator{} - encoder := &mockEncoder{} - - lu := NewLogsUnmarshaler(encoder, translator) - expectedLogs := pdata.NewLogs() - expectedBytes := []byte{1, 2, 3} - expectedModel := struct{}{} - - encoder.On("DecodeLogs", expectedBytes).Return(expectedModel, nil) - translator.On("ToLogs", expectedModel).Return(expectedLogs, nil) - - actualLogs, err := lu.Unmarshal(expectedBytes) - assert.NoError(t, err) - assert.Equal(t, expectedLogs, actualLogs) -} diff --git a/internal/otlp/json_encoder.go b/internal/otlp/json_encoder.go index e2b06573582..b70d1da652a 100644 --- a/internal/otlp/json_encoder.go +++ b/internal/otlp/json_encoder.go @@ -19,10 +19,10 @@ import ( "github.com/gogo/protobuf/jsonpb" + "go.opentelemetry.io/collector/consumer/pdata" otlpcollectorlogs "go.opentelemetry.io/collector/internal/data/protogen/collector/logs/v1" otlpcollectormetrics "go.opentelemetry.io/collector/internal/data/protogen/collector/metrics/v1" otlpcollectortrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1" - "go.opentelemetry.io/collector/internal/model" ) type jsonEncoder struct { @@ -36,7 +36,7 @@ func newJSONEncoder() *jsonEncoder { func (e *jsonEncoder) EncodeLogs(modelData interface{}) ([]byte, error) { ld, ok := modelData.(*otlpcollectorlogs.ExportLogsServiceRequest) if !ok { - return nil, model.NewErrIncompatibleType(&otlpcollectorlogs.ExportLogsServiceRequest{}, modelData) + return nil, pdata.NewErrIncompatibleType(&otlpcollectorlogs.ExportLogsServiceRequest{}, modelData) } buf := bytes.Buffer{} err := e.delegate.Marshal(&buf, ld) @@ -46,7 +46,7 @@ func (e *jsonEncoder) EncodeLogs(modelData interface{}) ([]byte, error) { func (e *jsonEncoder) EncodeMetrics(modelData interface{}) ([]byte, error) { md, ok := modelData.(*otlpcollectormetrics.ExportMetricsServiceRequest) if !ok { - return nil, model.NewErrIncompatibleType(&otlpcollectormetrics.ExportMetricsServiceRequest{}, modelData) + return nil, pdata.NewErrIncompatibleType(&otlpcollectormetrics.ExportMetricsServiceRequest{}, modelData) } buf := bytes.Buffer{} err := e.delegate.Marshal(&buf, md) @@ -56,7 +56,7 @@ func (e *jsonEncoder) EncodeMetrics(modelData interface{}) ([]byte, error) { func (e *jsonEncoder) EncodeTraces(modelData interface{}) ([]byte, error) { td, ok := modelData.(*otlpcollectortrace.ExportTraceServiceRequest) if !ok { - return nil, model.NewErrIncompatibleType(&otlpcollectortrace.ExportTraceServiceRequest{}, modelData) + return nil, pdata.NewErrIncompatibleType(&otlpcollectortrace.ExportTraceServiceRequest{}, modelData) } buf := bytes.Buffer{} err := e.delegate.Marshal(&buf, td) diff --git a/internal/otlp/marshaler.go b/internal/otlp/marshaler.go index 364b68a71c0..1c363910a8e 100644 --- a/internal/otlp/marshaler.go +++ b/internal/otlp/marshaler.go @@ -15,35 +15,35 @@ package otlp import ( - "go.opentelemetry.io/collector/internal/model" + "go.opentelemetry.io/collector/consumer/pdata" ) // NewJSONTracesMarshaler returns a model.TracesMarshaler. Marshals to OTLP json bytes. -func NewJSONTracesMarshaler() model.TracesMarshaler { - return model.NewTracesMarshaler(newJSONEncoder(), newFromTranslator()) +func NewJSONTracesMarshaler() pdata.TracesMarshaler { + return pdata.NewTracesMarshaler(newJSONEncoder(), newFromTranslator()) } // NewJSONMetricsMarshaler returns a model.MetricsMarshaler. Marshals to OTLP json bytes. -func NewJSONMetricsMarshaler() model.MetricsMarshaler { - return model.NewMetricsMarshaler(newJSONEncoder(), newFromTranslator()) +func NewJSONMetricsMarshaler() pdata.MetricsMarshaler { + return pdata.NewMetricsMarshaler(newJSONEncoder(), newFromTranslator()) } // NewJSONLogsMarshaler returns a model.LogsMarshaler. Marshals to OTLP json bytes. -func NewJSONLogsMarshaler() model.LogsMarshaler { - return model.NewLogsMarshaler(newJSONEncoder(), newFromTranslator()) +func NewJSONLogsMarshaler() pdata.LogsMarshaler { + return pdata.NewLogsMarshaler(newJSONEncoder(), newFromTranslator()) } // NewProtobufTracesMarshaler returns a model.TracesMarshaler. Marshals to OTLP binary protobuf bytes. -func NewProtobufTracesMarshaler() model.TracesMarshaler { - return model.NewTracesMarshaler(newPbEncoder(), newFromTranslator()) +func NewProtobufTracesMarshaler() pdata.TracesMarshaler { + return pdata.NewTracesMarshaler(newPbEncoder(), newFromTranslator()) } // NewProtobufMetricsMarshaler returns a model.MetricsMarshaler. Marshals to OTLP binary protobuf bytes. -func NewProtobufMetricsMarshaler() model.MetricsMarshaler { - return model.NewMetricsMarshaler(newPbEncoder(), newFromTranslator()) +func NewProtobufMetricsMarshaler() pdata.MetricsMarshaler { + return pdata.NewMetricsMarshaler(newPbEncoder(), newFromTranslator()) } // NewProtobufLogsMarshaler returns a model.LogsMarshaler. Marshals to OTLP binary protobuf bytes. -func NewProtobufLogsMarshaler() model.LogsMarshaler { - return model.NewLogsMarshaler(newPbEncoder(), newFromTranslator()) +func NewProtobufLogsMarshaler() pdata.LogsMarshaler { + return pdata.NewLogsMarshaler(newPbEncoder(), newFromTranslator()) } diff --git a/internal/otlp/pb_encoder.go b/internal/otlp/pb_encoder.go index 401c277b95c..dbed24a8359 100644 --- a/internal/otlp/pb_encoder.go +++ b/internal/otlp/pb_encoder.go @@ -15,10 +15,10 @@ package otlp import ( + "go.opentelemetry.io/collector/consumer/pdata" otlpcollectorlogs "go.opentelemetry.io/collector/internal/data/protogen/collector/logs/v1" otlpcollectormetrics "go.opentelemetry.io/collector/internal/data/protogen/collector/metrics/v1" otlpcollectortrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1" - "go.opentelemetry.io/collector/internal/model" ) type pbEncoder struct{} @@ -30,7 +30,7 @@ func newPbEncoder() *pbEncoder { func (e *pbEncoder) EncodeLogs(modelData interface{}) ([]byte, error) { ld, ok := modelData.(*otlpcollectorlogs.ExportLogsServiceRequest) if !ok { - return nil, model.NewErrIncompatibleType(&otlpcollectorlogs.ExportLogsServiceRequest{}, modelData) + return nil, pdata.NewErrIncompatibleType(&otlpcollectorlogs.ExportLogsServiceRequest{}, modelData) } return ld.Marshal() } @@ -38,7 +38,7 @@ func (e *pbEncoder) EncodeLogs(modelData interface{}) ([]byte, error) { func (e *pbEncoder) EncodeMetrics(modelData interface{}) ([]byte, error) { md, ok := modelData.(*otlpcollectormetrics.ExportMetricsServiceRequest) if !ok { - return nil, model.NewErrIncompatibleType(&otlpcollectormetrics.ExportMetricsServiceRequest{}, modelData) + return nil, pdata.NewErrIncompatibleType(&otlpcollectormetrics.ExportMetricsServiceRequest{}, modelData) } return md.Marshal() } @@ -46,7 +46,7 @@ func (e *pbEncoder) EncodeMetrics(modelData interface{}) ([]byte, error) { func (e *pbEncoder) EncodeTraces(modelData interface{}) ([]byte, error) { td, ok := modelData.(*otlpcollectortrace.ExportTraceServiceRequest) if !ok { - return nil, model.NewErrIncompatibleType(&otlpcollectortrace.ExportTraceServiceRequest{}, modelData) + return nil, pdata.NewErrIncompatibleType(&otlpcollectortrace.ExportTraceServiceRequest{}, modelData) } return td.Marshal() } diff --git a/internal/otlp/to_translator.go b/internal/otlp/to_translator.go index 5b347db3a31..720116ad259 100644 --- a/internal/otlp/to_translator.go +++ b/internal/otlp/to_translator.go @@ -20,7 +20,6 @@ import ( otlpcollectorlogs "go.opentelemetry.io/collector/internal/data/protogen/collector/logs/v1" otlpcollectormetrics "go.opentelemetry.io/collector/internal/data/protogen/collector/metrics/v1" otlpcollectortrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1" - "go.opentelemetry.io/collector/internal/model" ) type toTranslator struct{} @@ -32,7 +31,7 @@ func newToTranslator() *toTranslator { func (d *toTranslator) ToLogs(modelData interface{}) (pdata.Logs, error) { ld, ok := modelData.(*otlpcollectorlogs.ExportLogsServiceRequest) if !ok { - return pdata.Logs{}, model.NewErrIncompatibleType(&otlpcollectorlogs.ExportLogsServiceRequest{}, modelData) + return pdata.Logs{}, pdata.NewErrIncompatibleType(&otlpcollectorlogs.ExportLogsServiceRequest{}, modelData) } return pdata.LogsFromInternalRep(internal.LogsFromOtlp(ld)), nil } @@ -40,7 +39,7 @@ func (d *toTranslator) ToLogs(modelData interface{}) (pdata.Logs, error) { func (d *toTranslator) ToMetrics(modelData interface{}) (pdata.Metrics, error) { ld, ok := modelData.(*otlpcollectormetrics.ExportMetricsServiceRequest) if !ok { - return pdata.Metrics{}, model.NewErrIncompatibleType(&otlpcollectormetrics.ExportMetricsServiceRequest{}, modelData) + return pdata.Metrics{}, pdata.NewErrIncompatibleType(&otlpcollectormetrics.ExportMetricsServiceRequest{}, modelData) } return pdata.MetricsFromInternalRep(internal.MetricsFromOtlp(ld)), nil } @@ -48,7 +47,7 @@ func (d *toTranslator) ToMetrics(modelData interface{}) (pdata.Metrics, error) { func (d *toTranslator) ToTraces(modelData interface{}) (pdata.Traces, error) { td, ok := modelData.(*otlpcollectortrace.ExportTraceServiceRequest) if !ok { - return pdata.Traces{}, model.NewErrIncompatibleType(&otlpcollectortrace.ExportTraceServiceRequest{}, modelData) + return pdata.Traces{}, pdata.NewErrIncompatibleType(&otlpcollectortrace.ExportTraceServiceRequest{}, modelData) } return pdata.TracesFromInternalRep(internal.TracesFromOtlp(td)), nil } diff --git a/internal/otlp/unmarshaler.go b/internal/otlp/unmarshaler.go index f7f7fb61f9a..45af9e6774e 100644 --- a/internal/otlp/unmarshaler.go +++ b/internal/otlp/unmarshaler.go @@ -15,35 +15,35 @@ package otlp import ( - "go.opentelemetry.io/collector/internal/model" + "go.opentelemetry.io/collector/consumer/pdata" ) // NewJSONTracesUnmarshaler returns a model.TracesUnmarshaler. Unmarshals from OTLP json bytes. -func NewJSONTracesUnmarshaler() model.TracesUnmarshaler { - return model.NewTracesUnmarshaler(newJSONDecoder(), newToTranslator()) +func NewJSONTracesUnmarshaler() pdata.TracesUnmarshaler { + return pdata.NewTracesUnmarshaler(newJSONDecoder(), newToTranslator()) } // NewJSONMetricsUnmarshaler returns a model.MetricsUnmarshaler. Unmarshals from OTLP json bytes. -func NewJSONMetricsUnmarshaler() model.MetricsUnmarshaler { - return model.NewMetricsUnmarshaler(newJSONDecoder(), newToTranslator()) +func NewJSONMetricsUnmarshaler() pdata.MetricsUnmarshaler { + return pdata.NewMetricsUnmarshaler(newJSONDecoder(), newToTranslator()) } // NewJSONLogsUnmarshaler returns a model.LogsUnmarshaler. Unmarshals from OTLP json bytes. -func NewJSONLogsUnmarshaler() model.LogsUnmarshaler { - return model.NewLogsUnmarshaler(newJSONDecoder(), newToTranslator()) +func NewJSONLogsUnmarshaler() pdata.LogsUnmarshaler { + return pdata.NewLogsUnmarshaler(newJSONDecoder(), newToTranslator()) } // NewProtobufTracesUnmarshaler returns a model.TracesUnmarshaler. Unmarshals from OTLP binary protobuf bytes. -func NewProtobufTracesUnmarshaler() model.TracesUnmarshaler { - return model.NewTracesUnmarshaler(newPbDecoder(), newToTranslator()) +func NewProtobufTracesUnmarshaler() pdata.TracesUnmarshaler { + return pdata.NewTracesUnmarshaler(newPbDecoder(), newToTranslator()) } // NewProtobufMetricsUnmarshaler returns a model.MetricsUnmarshaler. Unmarshals from OTLP binary protobuf bytes. -func NewProtobufMetricsUnmarshaler() model.MetricsUnmarshaler { - return model.NewMetricsUnmarshaler(newPbDecoder(), newToTranslator()) +func NewProtobufMetricsUnmarshaler() pdata.MetricsUnmarshaler { + return pdata.NewMetricsUnmarshaler(newPbDecoder(), newToTranslator()) } // NewProtobufLogsUnmarshaler returns a model.LogsUnmarshaler. Unmarshals from OTLP binary protobuf bytes. -func NewProtobufLogsUnmarshaler() model.LogsUnmarshaler { - return model.NewLogsUnmarshaler(newPbDecoder(), newToTranslator()) +func NewProtobufLogsUnmarshaler() pdata.LogsUnmarshaler { + return pdata.NewLogsUnmarshaler(newPbDecoder(), newToTranslator()) } diff --git a/internal/otlptext/logs.go b/internal/otlptext/logs.go index ae6b3a14e9a..789a4a73f18 100644 --- a/internal/otlptext/logs.go +++ b/internal/otlptext/logs.go @@ -16,11 +16,10 @@ package otlptext import ( "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/internal/model" ) // NewTextLogsMarshaler returns a serializer.LogsMarshaler to encode to OTLP json bytes. -func NewTextLogsMarshaler() model.LogsMarshaler { +func NewTextLogsMarshaler() pdata.LogsMarshaler { return logsMarshaler{} } diff --git a/internal/otlptext/metrics.go b/internal/otlptext/metrics.go index 5b08fa29c46..47536bc231d 100644 --- a/internal/otlptext/metrics.go +++ b/internal/otlptext/metrics.go @@ -16,11 +16,10 @@ package otlptext import ( "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/internal/model" ) // NewTextMetricsMarshaler returns a serializer.MetricsMarshaler to encode to OTLP json bytes. -func NewTextMetricsMarshaler() model.MetricsMarshaler { +func NewTextMetricsMarshaler() pdata.MetricsMarshaler { return metricsMarshaler{} } diff --git a/internal/otlptext/traces.go b/internal/otlptext/traces.go index ce12785b2bd..0c3189bdadc 100644 --- a/internal/otlptext/traces.go +++ b/internal/otlptext/traces.go @@ -16,11 +16,10 @@ package otlptext import ( "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/internal/model" ) // NewTextTracesMarshaler returns a serializer.TracesMarshaler to encode to OTLP json bytes. -func NewTextTracesMarshaler() model.TracesMarshaler { +func NewTextTracesMarshaler() pdata.TracesMarshaler { return tracesMarshaler{} } diff --git a/receiver/kafkareceiver/pdata_unmarshaler.go b/receiver/kafkareceiver/pdata_unmarshaler.go index ab3ea6486ec..224933d4ba0 100644 --- a/receiver/kafkareceiver/pdata_unmarshaler.go +++ b/receiver/kafkareceiver/pdata_unmarshaler.go @@ -15,11 +15,11 @@ package kafkareceiver import ( - "go.opentelemetry.io/collector/internal/model" + "go.opentelemetry.io/collector/consumer/pdata" ) type pdataLogsUnmarshaler struct { - model.LogsUnmarshaler + pdata.LogsUnmarshaler encoding string } @@ -27,7 +27,7 @@ func (p pdataLogsUnmarshaler) Encoding() string { return p.encoding } -func newPdataLogsUnmarshaler(unmarshaler model.LogsUnmarshaler, encoding string) LogsUnmarshaler { +func newPdataLogsUnmarshaler(unmarshaler pdata.LogsUnmarshaler, encoding string) LogsUnmarshaler { return pdataLogsUnmarshaler{ LogsUnmarshaler: unmarshaler, encoding: encoding, @@ -35,7 +35,7 @@ func newPdataLogsUnmarshaler(unmarshaler model.LogsUnmarshaler, encoding string) } type pdataTracesUnmarshaler struct { - model.TracesUnmarshaler + pdata.TracesUnmarshaler encoding string } @@ -43,7 +43,7 @@ func (p pdataTracesUnmarshaler) Encoding() string { return p.encoding } -func newPdataTracesUnmarshaler(unmarshaler model.TracesUnmarshaler, encoding string) TracesUnmarshaler { +func newPdataTracesUnmarshaler(unmarshaler pdata.TracesUnmarshaler, encoding string) TracesUnmarshaler { return pdataTracesUnmarshaler{ TracesUnmarshaler: unmarshaler, encoding: encoding, @@ -51,7 +51,7 @@ func newPdataTracesUnmarshaler(unmarshaler model.TracesUnmarshaler, encoding str } type pdataMetricsUnmarshaler struct { - model.MetricsUnmarshaler + pdata.MetricsUnmarshaler encoding string } @@ -59,7 +59,7 @@ func (p pdataMetricsUnmarshaler) Encoding() string { return p.encoding } -func newPdataMetricsUnmarshaler(unmarshaler model.MetricsUnmarshaler, encoding string) MetricsUnmarshaler { +func newPdataMetricsUnmarshaler(unmarshaler pdata.MetricsUnmarshaler, encoding string) MetricsUnmarshaler { return pdataMetricsUnmarshaler{ MetricsUnmarshaler: unmarshaler, encoding: encoding, diff --git a/receiver/otlpreceiver/otlphttp.go b/receiver/otlpreceiver/otlphttp.go index 911c6af3f96..d194cc84a86 100644 --- a/receiver/otlpreceiver/otlphttp.go +++ b/receiver/otlpreceiver/otlphttp.go @@ -25,7 +25,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "go.opentelemetry.io/collector/internal/model" + "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/logs" "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/metrics" "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/trace" @@ -38,7 +38,7 @@ func handleTraces( req *http.Request, contentType string, tracesReceiver *trace.Receiver, - tracesUnmarshaler model.TracesUnmarshaler) { + tracesUnmarshaler pdata.TracesUnmarshaler) { body, ok := readAndCloseBody(resp, req, contentType) if !ok { return @@ -65,7 +65,7 @@ func handleMetrics( req *http.Request, contentType string, metricsReceiver *metrics.Receiver, - metricsUnmarshaler model.MetricsUnmarshaler) { + metricsUnmarshaler pdata.MetricsUnmarshaler) { body, ok := readAndCloseBody(resp, req, contentType) if !ok { return @@ -92,7 +92,7 @@ func handleLogs( req *http.Request, contentType string, logsReceiver *logs.Receiver, - logsUnmarshaler model.LogsUnmarshaler) { + logsUnmarshaler pdata.LogsUnmarshaler) { body, ok := readAndCloseBody(resp, req, contentType) if !ok { return diff --git a/receiver/zipkinreceiver/trace_receiver.go b/receiver/zipkinreceiver/trace_receiver.go index beee491ee62..d99cdd82588 100644 --- a/receiver/zipkinreceiver/trace_receiver.go +++ b/receiver/zipkinreceiver/trace_receiver.go @@ -32,7 +32,6 @@ import ( "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/internal/model" "go.opentelemetry.io/collector/obsreport" "go.opentelemetry.io/collector/translator/trace/zipkinv1" "go.opentelemetry.io/collector/translator/trace/zipkinv2" @@ -58,11 +57,11 @@ type ZipkinReceiver struct { server *http.Server config *Config - v1ThriftUnmarshaler model.TracesUnmarshaler - v1JSONUnmarshaler model.TracesUnmarshaler - jsonUnmarshaler model.TracesUnmarshaler - protobufUnmarshaler model.TracesUnmarshaler - protobufDebugUnmarshaler model.TracesUnmarshaler + v1ThriftUnmarshaler pdata.TracesUnmarshaler + v1JSONUnmarshaler pdata.TracesUnmarshaler + jsonUnmarshaler pdata.TracesUnmarshaler + protobufUnmarshaler pdata.TracesUnmarshaler + protobufDebugUnmarshaler pdata.TracesUnmarshaler } var _ http.Handler = (*ZipkinReceiver)(nil) diff --git a/translator/trace/zipkinv1/json.go b/translator/trace/zipkinv1/json.go index d74c1895a03..7a81eb7c616 100644 --- a/translator/trace/zipkinv1/json.go +++ b/translator/trace/zipkinv1/json.go @@ -29,7 +29,6 @@ import ( "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/internal/idutils" - "go.opentelemetry.io/collector/internal/model" tracetranslator "go.opentelemetry.io/collector/translator/trace" "go.opentelemetry.io/collector/translator/trace/internal/zipkin" ) @@ -47,7 +46,7 @@ var ( errHexIDWrongLen = errors.New("hex Id has wrong length (expected 16)") errHexIDParsing = errors.New("failed to parse hex Id") errHexIDZero = errors.New("ID is zero") - _ model.TracesDecoder = (*jsonDecoder)(nil) + _ pdata.TracesDecoder = (*jsonDecoder)(nil) ) type jsonDecoder struct { @@ -61,8 +60,8 @@ func (j jsonDecoder) DecodeTraces(buf []byte) (interface{}, error) { } // NewJSONTracesUnmarshaler returns an unmarshaler for Zipkin JSON. -func NewJSONTracesUnmarshaler(parseStringTags bool) model.TracesUnmarshaler { - return model.NewTracesUnmarshaler(jsonDecoder{ParseStringTags: parseStringTags}, toTranslator{}) +func NewJSONTracesUnmarshaler(parseStringTags bool) pdata.TracesUnmarshaler { + return pdata.NewTracesUnmarshaler(jsonDecoder{ParseStringTags: parseStringTags}, toTranslator{}) } // Trace translation from Zipkin V1 is a bit of special case since there is no model diff --git a/translator/trace/zipkinv1/thrift.go b/translator/trace/zipkinv1/thrift.go index 73dff84642a..691a7657601 100644 --- a/translator/trace/zipkinv1/thrift.go +++ b/translator/trace/zipkinv1/thrift.go @@ -28,11 +28,11 @@ import ( "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" "google.golang.org/protobuf/types/known/timestamppb" + "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/internal/idutils" - "go.opentelemetry.io/collector/internal/model" ) -var _ model.TracesDecoder = (*thriftDecoder)(nil) +var _ pdata.TracesDecoder = (*thriftDecoder)(nil) type thriftDecoder struct{} @@ -46,8 +46,8 @@ func (t thriftDecoder) DecodeTraces(buf []byte) (interface{}, error) { } // NewThriftTracesUnmarshaler returns an unmarshaler for Zipkin Thrift. -func NewThriftTracesUnmarshaler() model.TracesUnmarshaler { - return model.NewTracesUnmarshaler(thriftDecoder{}, toTranslator{}) +func NewThriftTracesUnmarshaler() pdata.TracesUnmarshaler { + return pdata.NewTracesUnmarshaler(thriftDecoder{}, toTranslator{}) } // v1ThriftBatchToOCProto converts Zipkin v1 spans to OC Proto. diff --git a/translator/trace/zipkinv1/to_translator.go b/translator/trace/zipkinv1/to_translator.go index 0819510201e..0b1a7e3a7d7 100644 --- a/translator/trace/zipkinv1/to_translator.go +++ b/translator/trace/zipkinv1/to_translator.go @@ -16,11 +16,10 @@ package zipkinv1 import ( "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/internal/model" "go.opentelemetry.io/collector/translator/internaldata" ) -var _ model.ToTracesTranslator = (*toTranslator)(nil) +var _ pdata.ToTracesTranslator = (*toTranslator)(nil) type toTranslator struct{} @@ -28,7 +27,7 @@ type toTranslator struct{} func (t toTranslator) ToTraces(src interface{}) (pdata.Traces, error) { ocTraces, ok := src.([]traceData) if !ok { - return pdata.Traces{}, model.NewErrIncompatibleType([]traceData{}, src) + return pdata.Traces{}, pdata.NewErrIncompatibleType([]traceData{}, src) } td := pdata.NewTraces() diff --git a/translator/trace/zipkinv2/from_translator.go b/translator/trace/zipkinv2/from_translator.go index 017fc1fefc9..dad08389d0b 100644 --- a/translator/trace/zipkinv2/from_translator.go +++ b/translator/trace/zipkinv2/from_translator.go @@ -26,7 +26,6 @@ import ( "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/internal/idutils" - "go.opentelemetry.io/collector/internal/model" "go.opentelemetry.io/collector/translator/conventions" tracetranslator "go.opentelemetry.io/collector/translator/trace" "go.opentelemetry.io/collector/translator/trace/internal/zipkin" @@ -39,7 +38,7 @@ const ( var ( sampled = true - _ model.FromTracesTranslator = (*FromTranslator)(nil) + _ pdata.FromTracesTranslator = (*FromTranslator)(nil) ) // FromTranslator converts from pdata to Zipkin data model. diff --git a/translator/trace/zipkinv2/json.go b/translator/trace/zipkinv2/json.go index 7274589866a..c9b5a774da6 100644 --- a/translator/trace/zipkinv2/json.go +++ b/translator/trace/zipkinv2/json.go @@ -20,10 +20,10 @@ import ( zipkinmodel "github.com/openzipkin/zipkin-go/model" zipkinreporter "github.com/openzipkin/zipkin-go/reporter" - "go.opentelemetry.io/collector/internal/model" + "go.opentelemetry.io/collector/consumer/pdata" ) -var _ model.TracesDecoder = (*jsonDecoder)(nil) +var _ pdata.TracesDecoder = (*jsonDecoder)(nil) type jsonDecoder struct{} @@ -36,7 +36,7 @@ func (j jsonDecoder) DecodeTraces(buf []byte) (interface{}, error) { return spans, nil } -var _ model.TracesEncoder = (*jsonEncoder)(nil) +var _ pdata.TracesEncoder = (*jsonEncoder)(nil) type jsonEncoder struct { serializer zipkinreporter.JSONSerializer @@ -46,17 +46,17 @@ type jsonEncoder struct { func (j jsonEncoder) EncodeTraces(mod interface{}) ([]byte, error) { spans, ok := mod.([]*zipkinmodel.SpanModel) if !ok { - return nil, model.NewErrIncompatibleType([]*zipkinmodel.SpanModel{}, mod) + return nil, pdata.NewErrIncompatibleType([]*zipkinmodel.SpanModel{}, mod) } return j.serializer.Serialize(spans) } // NewJSONTracesUnmarshaler returns an unmarshaler for JSON bytes. -func NewJSONTracesUnmarshaler(parseStringTags bool) model.TracesUnmarshaler { - return model.NewTracesUnmarshaler(jsonDecoder{}, ToTranslator{ParseStringTags: parseStringTags}) +func NewJSONTracesUnmarshaler(parseStringTags bool) pdata.TracesUnmarshaler { + return pdata.NewTracesUnmarshaler(jsonDecoder{}, ToTranslator{ParseStringTags: parseStringTags}) } // NewJSONTracesMarshaler returns a marshaler to JSON bytes. -func NewJSONTracesMarshaler() model.TracesMarshaler { - return model.NewTracesMarshaler(jsonEncoder{}, FromTranslator{}) +func NewJSONTracesMarshaler() pdata.TracesMarshaler { + return pdata.NewTracesMarshaler(jsonEncoder{}, FromTranslator{}) } diff --git a/translator/trace/zipkinv2/json_test.go b/translator/trace/zipkinv2/json_test.go index 62ae0229fd3..0b104b1bbdb 100644 --- a/translator/trace/zipkinv2/json_test.go +++ b/translator/trace/zipkinv2/json_test.go @@ -22,7 +22,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/internal/model" + "go.opentelemetry.io/collector/consumer/pdata" ) func TestJSONDecoder_DecodeTraces(t *testing.T) { @@ -59,11 +59,11 @@ func TestJSONEncoder_EncodeTracesError(t *testing.T) { func TestNewJSONTracesUnmarshaler(t *testing.T) { m := NewJSONTracesUnmarshaler(false) assert.NotNil(t, m) - assert.Implements(t, (*model.TracesUnmarshaler)(nil), m) + assert.Implements(t, (*pdata.TracesUnmarshaler)(nil), m) } func TestNewJSONTracesMarshaler(t *testing.T) { m := NewJSONTracesMarshaler() assert.NotNil(t, m) - assert.Implements(t, (*model.TracesMarshaler)(nil), m) + assert.Implements(t, (*pdata.TracesMarshaler)(nil), m) } diff --git a/translator/trace/zipkinv2/protobuf.go b/translator/trace/zipkinv2/protobuf.go index 42e75ac0164..8e7ffba6111 100644 --- a/translator/trace/zipkinv2/protobuf.go +++ b/translator/trace/zipkinv2/protobuf.go @@ -18,10 +18,10 @@ import ( zipkinmodel "github.com/openzipkin/zipkin-go/model" "github.com/openzipkin/zipkin-go/proto/zipkin_proto3" - "go.opentelemetry.io/collector/internal/model" + "go.opentelemetry.io/collector/consumer/pdata" ) -var _ model.TracesDecoder = (*protobufDecoder)(nil) +var _ pdata.TracesDecoder = (*protobufDecoder)(nil) type protobufDecoder struct { // DebugWasSet toggles the Debug field of each Span. It is usually set to true if @@ -38,7 +38,7 @@ func (p protobufDecoder) DecodeTraces(buf []byte) (interface{}, error) { return spans, nil } -var _ model.TracesEncoder = (*protobufEncoder)(nil) +var _ pdata.TracesEncoder = (*protobufEncoder)(nil) type protobufEncoder struct { serializer zipkin_proto3.SpanSerializer @@ -48,20 +48,20 @@ type protobufEncoder struct { func (p protobufEncoder) EncodeTraces(mod interface{}) ([]byte, error) { spans, ok := mod.([]*zipkinmodel.SpanModel) if !ok { - return nil, model.NewErrIncompatibleType([]*zipkinmodel.SpanModel{}, mod) + return nil, pdata.NewErrIncompatibleType([]*zipkinmodel.SpanModel{}, mod) } return p.serializer.Serialize(spans) } // NewProtobufTracesUnmarshaler returns an unmarshaler of protobuf bytes. -func NewProtobufTracesUnmarshaler(debugWasSet, parseStringTags bool) model.TracesUnmarshaler { - return model.NewTracesUnmarshaler( +func NewProtobufTracesUnmarshaler(debugWasSet, parseStringTags bool) pdata.TracesUnmarshaler { + return pdata.NewTracesUnmarshaler( protobufDecoder{DebugWasSet: debugWasSet}, ToTranslator{ParseStringTags: parseStringTags}, ) } // NewProtobufTracesMarshaler returns a new marshaler to protobuf bytes. -func NewProtobufTracesMarshaler() model.TracesMarshaler { - return model.NewTracesMarshaler(protobufEncoder{}, FromTranslator{}) +func NewProtobufTracesMarshaler() pdata.TracesMarshaler { + return pdata.NewTracesMarshaler(protobufEncoder{}, FromTranslator{}) } diff --git a/translator/trace/zipkinv2/protobuf_test.go b/translator/trace/zipkinv2/protobuf_test.go index 80b98175419..f0a8be4303b 100644 --- a/translator/trace/zipkinv2/protobuf_test.go +++ b/translator/trace/zipkinv2/protobuf_test.go @@ -23,7 +23,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/internal/model" + "go.opentelemetry.io/collector/consumer/pdata" ) func TestProtobufDecoder_DecodeTraces(t *testing.T) { @@ -67,11 +67,11 @@ func TestProtobufEncoder_EncodeTracesError(t *testing.T) { func TestNewProtobufTracesUnmarshaler(t *testing.T) { m := NewProtobufTracesUnmarshaler(false, false) assert.NotNil(t, m) - assert.Implements(t, (*model.TracesUnmarshaler)(nil), m) + assert.Implements(t, (*pdata.TracesUnmarshaler)(nil), m) } func TestNewProtobufTracesMarshaler(t *testing.T) { m := NewProtobufTracesMarshaler() assert.NotNil(t, m) - assert.Implements(t, (*model.TracesMarshaler)(nil), m) + assert.Implements(t, (*pdata.TracesMarshaler)(nil), m) } diff --git a/translator/trace/zipkinv2/to_translator.go b/translator/trace/zipkinv2/to_translator.go index 0eafcd5cb6f..05bf2db5d4f 100644 --- a/translator/trace/zipkinv2/to_translator.go +++ b/translator/trace/zipkinv2/to_translator.go @@ -29,14 +29,13 @@ import ( "go.opentelemetry.io/collector/internal/data" otlptrace "go.opentelemetry.io/collector/internal/data/protogen/trace/v1" "go.opentelemetry.io/collector/internal/idutils" - "go.opentelemetry.io/collector/internal/model" "go.opentelemetry.io/collector/internal/occonventions" "go.opentelemetry.io/collector/translator/conventions" tracetranslator "go.opentelemetry.io/collector/translator/trace" "go.opentelemetry.io/collector/translator/trace/internal/zipkin" ) -var _ model.ToTracesTranslator = (*ToTranslator)(nil) +var _ pdata.ToTracesTranslator = (*ToTranslator)(nil) // ToTranslator converts from Zipkin data model to pdata. type ToTranslator struct { @@ -48,7 +47,7 @@ type ToTranslator struct { func (t ToTranslator) ToTraces(src interface{}) (pdata.Traces, error) { zipkinSpans, ok := src.([]*zipkinmodel.SpanModel) if !ok { - return pdata.Traces{}, model.NewErrIncompatibleType([]*zipkinmodel.SpanModel{}, src) + return pdata.Traces{}, pdata.NewErrIncompatibleType([]*zipkinmodel.SpanModel{}, src) } traceData := pdata.NewTraces()