Skip to content

Commit

Permalink
zipkinv2: add encoders, decoders, marshalers
Browse files Browse the repository at this point in the history
Also update zipkin receiver to use them.
  • Loading branch information
jrcamp committed Jun 11, 2021
1 parent fd38d74 commit 992993d
Show file tree
Hide file tree
Showing 9 changed files with 320 additions and 55 deletions.
20 changes: 5 additions & 15 deletions receiver/kafkareceiver/zipkin_unmarshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,9 @@ package kafkareceiver

import (
"context"
"encoding/json"

"github.com/apache/thrift/lib/go/thrift"
"github.com/jaegertracing/jaeger/thrift-gen/zipkincore"
zipkinmodel "github.com/openzipkin/zipkin-go/model"
"github.com/openzipkin/zipkin-go/proto/zipkin_proto3"

"go.opentelemetry.io/collector/consumer/pdata"
zipkintranslator "go.opentelemetry.io/collector/translator/trace/zipkin"
Expand All @@ -32,16 +29,13 @@ type zipkinProtoSpanUnmarshaler struct {
}

var (
_ TracesUnmarshaler = (*zipkinProtoSpanUnmarshaler)(nil)
toTranslator = zipkinv2.ToTranslator{ParseStringTags: false}
_ TracesUnmarshaler = (*zipkinProtoSpanUnmarshaler)(nil)
zipkinProtobufUnmarshaler = zipkinv2.NewProtobufTracesUnmarshaler(false, false)
zipkinJSONUnmarshaler = zipkinv2.NewJSONTracesUnmarshaler(false)
)

func (z zipkinProtoSpanUnmarshaler) Unmarshal(bytes []byte) (pdata.Traces, error) {
parseSpans, err := zipkin_proto3.ParseSpans(bytes, false)
if err != nil {
return pdata.NewTraces(), err
}
return toTranslator.ToTraces(parseSpans)
return zipkinProtobufUnmarshaler.Unmarshal(bytes)
}

func (z zipkinProtoSpanUnmarshaler) Encoding() string {
Expand All @@ -54,11 +48,7 @@ type zipkinJSONSpanUnmarshaler struct {
var _ TracesUnmarshaler = (*zipkinJSONSpanUnmarshaler)(nil)

func (z zipkinJSONSpanUnmarshaler) Unmarshal(bytes []byte) (pdata.Traces, error) {
var spans []*zipkinmodel.SpanModel
if err := json.Unmarshal(bytes, &spans); err != nil {
return pdata.NewTraces(), err
}
return toTranslator.ToTraces(spans)
return zipkinJSONUnmarshaler.Unmarshal(bytes)
}

func (z zipkinJSONSpanUnmarshaler) Encoding() string {
Expand Down
4 changes: 2 additions & 2 deletions receiver/kafkareceiver/zipkin_unmarshaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,13 @@ func TestUnmarshalZipkinThrift_error(t *testing.T) {
func TestUnmarshalZipkinJSON_error(t *testing.T) {
p := zipkinJSONSpanUnmarshaler{}
got, err := p.Unmarshal([]byte("+$%"))
assert.Equal(t, pdata.NewTraces(), got)
assert.Equal(t, pdata.Traces{}, got)
assert.Error(t, err)
}

func TestUnmarshalZipkinProto_error(t *testing.T) {
p := zipkinProtoSpanUnmarshaler{}
got, err := p.Unmarshal([]byte("+$%"))
assert.Equal(t, pdata.NewTraces(), got)
assert.Equal(t, pdata.Traces{}, got)
assert.Error(t, err)
}
14 changes: 11 additions & 3 deletions receiver/zipkinreceiver/proto_parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ import (
"go.opentelemetry.io/collector/translator/trace/zipkinv2"
)

var translator = zipkinv2.ToTranslator{ParseStringTags: false}

func TestConvertSpansToTraceSpans_protobuf(t *testing.T) {
// TODO: (@odeke-em) examine the entire codepath that time goes through
// in Zipkin-Go to ensure that all rounding matches. Otherwise
Expand Down Expand Up @@ -100,7 +98,7 @@ func TestConvertSpansToTraceSpans_protobuf(t *testing.T) {
// 2. Serialize it
protoBlob, err := proto.Marshal(payloadFromWild)
require.NoError(t, err, "Failed to protobuf serialize payload: %v", err)
zi := &ZipkinReceiver{translator: translator, config: createDefaultConfig().(*Config)}
zi := newTestZipkinReceiver()
hdr := make(http.Header)
hdr.Set("Content-Type", "application/x-protobuf")

Expand Down Expand Up @@ -166,6 +164,16 @@ func TestConvertSpansToTraceSpans_protobuf(t *testing.T) {
}
}

func newTestZipkinReceiver() *ZipkinReceiver {
cfg := createDefaultConfig().(*Config)
return &ZipkinReceiver{
config: cfg,
jsonUnmarshaler: zipkinv2.NewJSONTracesUnmarshaler(cfg.ParseStringTags),
protobufUnmarshaler: zipkinv2.NewProtobufTracesUnmarshaler(false, cfg.ParseStringTags),
protobufDebugUnmarshaler: zipkinv2.NewProtobufTracesUnmarshaler(true, cfg.ParseStringTags),
}
}

func compareResourceSpans(t *testing.T, wantRS pdata.ResourceSpans, reqsRS pdata.ResourceSpans) {
assert.Equal(t, wantRS.InstrumentationLibrarySpans().Len(), reqsRS.InstrumentationLibrarySpans().Len())
wantIL := wantRS.InstrumentationLibrarySpans().At(0)
Expand Down
53 changes: 22 additions & 31 deletions receiver/zipkinreceiver/trace_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"compress/gzip"
"compress/zlib"
"context"
"encoding/json"
"errors"
"io"
"io/ioutil"
Expand All @@ -28,8 +27,6 @@ import (
"sync"

jaegerzipkin "github.com/jaegertracing/jaeger/model/converter/thrift/zipkin"
zipkinmodel "github.com/openzipkin/zipkin-go/model"
"github.com/openzipkin/zipkin-go/proto/zipkin_proto3"

"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -62,10 +59,12 @@ type ZipkinReceiver struct {
nextConsumer consumer.Traces
id config.ComponentID

shutdownWG sync.WaitGroup
server *http.Server
config *Config
translator model.ToTracesTranslator
shutdownWG sync.WaitGroup
server *http.Server
config *Config
jsonUnmarshaler model.TracesUnmarshaler
protobufUnmarshaler model.TracesUnmarshaler
protobufDebugUnmarshaler model.TracesUnmarshaler
}

var _ http.Handler = (*ZipkinReceiver)(nil)
Expand All @@ -77,10 +76,12 @@ func New(config *Config, nextConsumer consumer.Traces) (*ZipkinReceiver, error)
}

zr := &ZipkinReceiver{
nextConsumer: nextConsumer,
id: config.ID(),
config: config,
translator: zipkinv2.ToTranslator{ParseStringTags: config.ParseStringTags},
nextConsumer: nextConsumer,
id: config.ID(),
config: config,
jsonUnmarshaler: zipkinv2.NewJSONTracesUnmarshaler(config.ParseStringTags),
protobufUnmarshaler: zipkinv2.NewProtobufTracesUnmarshaler(false, config.ParseStringTags),
protobufDebugUnmarshaler: zipkinv2.NewProtobufTracesUnmarshaler(true, config.ParseStringTags),
}
return zr, nil
}
Expand Down Expand Up @@ -132,30 +133,20 @@ func (zr *ZipkinReceiver) v2ToTraceSpans(blob []byte, hdr http.Header) (reqs pda
// https://github.com/openzipkin/zipkin-go/blob/3793c981d4f621c0e3eb1457acffa2c1cc591384/proto/v2/zipkin.proto#L154
debugWasSet := hdr.Get("X-B3-Flags") == "1"

var zipkinSpans []*zipkinmodel.SpanModel
// By default, we'll assume using JSON
unmarshaler := zr.jsonUnmarshaler

// Zipkin can send protobuf via http
switch hdr.Get("Content-Type") {
// TODO: (@odeke-em) record the unique types of Content-Type uploads
case "application/x-protobuf":
zipkinSpans, err = zipkin_proto3.ParseSpans(blob, debugWasSet)

default: // By default, we'll assume using JSON
zipkinSpans, err = zr.deserializeFromJSON(blob)
}

if err != nil {
return pdata.Traces{}, err
if hdr.Get("Content-Type") == "application/x-protobuf" {
// TODO: (@odeke-em) record the unique types of Content-Type uploads
if debugWasSet {
unmarshaler = zr.protobufDebugUnmarshaler
} else {
unmarshaler = zr.protobufUnmarshaler
}
}

return zr.translator.ToTraces(zipkinSpans)
}

func (zr *ZipkinReceiver) deserializeFromJSON(jsonBlob []byte) (zs []*zipkinmodel.SpanModel, err error) {
if err = json.Unmarshal(jsonBlob, &zs); err != nil {
return nil, err
}
return zs, nil
return unmarshaler.Unmarshal(blob)
}

// Shutdown tells the receiver that should stop reception,
Expand Down
9 changes: 5 additions & 4 deletions receiver/zipkinreceiver/trace_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func TestConvertSpansToTraceSpans_json(t *testing.T) {
// Using Adrian Cole's sample at https://gist.github.com/adriancole/e8823c19dfed64e2eb71
blob, err := ioutil.ReadFile("./testdata/sample1.json")
require.NoError(t, err, "Failed to read sample JSON file: %v", err)
zi := &ZipkinReceiver{translator: translator, config: createDefaultConfig().(*Config)}
zi := newTestZipkinReceiver()
reqs, err := zi.v2ToTraceSpans(blob, nil)
require.NoError(t, err, "Failed to parse convert Zipkin spans in JSON to Trace spans: %v", err)

Expand Down Expand Up @@ -207,7 +207,8 @@ func TestConversionRoundtrip(t *testing.T) {
}
}]`)

zi := &ZipkinReceiver{translator: translator, nextConsumer: consumertest.NewNop()}
zi := newTestZipkinReceiver()
zi.nextConsumer = consumertest.NewNop()
zi.config = &Config{}
ereqs, err := zi.v2ToTraceSpans(receiverInputJSON, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -423,7 +424,7 @@ func TestReceiverInvalidContentType(t *testing.T) {
zr.ServeHTTP(req, r)

require.Equal(t, 400, req.Code)
require.Equal(t, "invalid character 'i' looking for beginning of object key string\n", req.Body.String())
require.Equal(t, "unmarshal failed: invalid character 'i' looking for beginning of object key string\n", req.Body.String())
}

func TestReceiverConsumerError(t *testing.T) {
Expand Down Expand Up @@ -504,7 +505,7 @@ func compressZlib(body []byte) (*bytes.Buffer, error) {
func TestConvertSpansToTraceSpans_JSONWithoutSerivceName(t *testing.T) {
blob, err := ioutil.ReadFile("./testdata/sample2.json")
require.NoError(t, err, "Failed to read sample JSON file: %v", err)
zi := &ZipkinReceiver{translator: translator, config: createDefaultConfig().(*Config)}
zi := newTestZipkinReceiver()
reqs, err := zi.v2ToTraceSpans(blob, nil)
require.NoError(t, err, "Failed to parse convert Zipkin spans in JSON to Trace spans: %v", err)

Expand Down
62 changes: 62 additions & 0 deletions translator/trace/zipkinv2/json.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package zipkinv2

import (
"encoding/json"

zipkinmodel "github.com/openzipkin/zipkin-go/model"
zipkinreporter "github.com/openzipkin/zipkin-go/reporter"

"go.opentelemetry.io/collector/internal/model"
)

var _ model.TracesDecoder = (*jsonDecoder)(nil)

type jsonDecoder struct{}

// DecodeTraces from JSON bytes to zipkin model.
func (j jsonDecoder) DecodeTraces(buf []byte) (interface{}, error) {
var spans []*zipkinmodel.SpanModel
if err := json.Unmarshal(buf, &spans); err != nil {
return nil, err
}
return spans, nil
}

var _ model.TracesEncoder = (*jsonEncoder)(nil)

type jsonEncoder struct {
serializer zipkinreporter.JSONSerializer
}

// EncodeTraces from zipkin model to bytes.
func (j jsonEncoder) EncodeTraces(mod interface{}) ([]byte, error) {
spans, ok := mod.([]*zipkinmodel.SpanModel)
if !ok {
return nil, model.NewErrIncompatibleType([]*zipkinmodel.SpanModel{}, mod)
}
return j.serializer.Serialize(spans)
}

// NewJSONTracesUnmarshaler returns an unmarshaler for JSON bytes.
func NewJSONTracesUnmarshaler(parseStringTags bool) model.TracesUnmarshaler {
return model.NewTracesUnmarshaler(jsonDecoder{}, ToTranslator{ParseStringTags: parseStringTags})
}

// NewJSONTracesMarshaler returns a marshaler to JSON bytes.
func NewJSONTracesMarshaler() model.TracesMarshaler {
return model.NewTracesMarshaler(jsonEncoder{}, FromTranslator{})
}
69 changes: 69 additions & 0 deletions translator/trace/zipkinv2/json_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package zipkinv2

import (
"io/ioutil"
"testing"

zipkinmodel "github.com/openzipkin/zipkin-go/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/internal/model"
)

func TestJSONDecoder_DecodeTraces(t *testing.T) {
data, err := ioutil.ReadFile("testdata/zipkin_v2_single.json")
require.NoError(t, err)
decoder := jsonDecoder{}
spans, err := decoder.DecodeTraces(data)
assert.NoError(t, err)
assert.NotNil(t, spans)
assert.IsType(t, []*zipkinmodel.SpanModel{}, spans)
}

func TestJSONDecoder_DecodeTracesError(t *testing.T) {
decoder := jsonDecoder{}
spans, err := decoder.DecodeTraces([]byte("{"))
assert.Error(t, err)
assert.Nil(t, spans)
}

func TestJSONEncoder_EncodeTraces(t *testing.T) {
encoder := jsonEncoder{}
buf, err := encoder.EncodeTraces(generateSpanErrorTags())
assert.NoError(t, err)
assert.Greater(t, len(buf), 1)
}

func TestJSONEncoder_EncodeTracesError(t *testing.T) {
encoder := jsonEncoder{}
buf, err := encoder.EncodeTraces(nil)
assert.Error(t, err)
assert.Nil(t, buf)
}

func TestNewJSONTracesUnmarshaler(t *testing.T) {
m := NewJSONTracesUnmarshaler(false)
assert.NotNil(t, m)
assert.Implements(t, (*model.TracesUnmarshaler)(nil), m)
}

func TestNewJSONTracesMarshaler(t *testing.T) {
m := NewJSONTracesMarshaler()
assert.NotNil(t, m)
assert.Implements(t, (*model.TracesMarshaler)(nil), m)
}
Loading

0 comments on commit 992993d

Please sign in to comment.