From ff262866282a119c9f67412305a65566dfd82527 Mon Sep 17 00:00:00 2001 From: metapox <84276752+metapox@users.noreply.github.com> Date: Fri, 10 Mar 2023 09:30:18 +0900 Subject: [PATCH] tmp for share --- cmd/ingester/app/builder/builder.go | 2 ++ plugin/storage/kafka/options.go | 1 + plugin/storage/kafka/unmarshaller.go | 28 ++++++++++++++++++++++++++++ 3 files changed, 31 insertions(+) diff --git a/cmd/ingester/app/builder/builder.go b/cmd/ingester/app/builder/builder.go index d984b5dcd03..d0fd36c02bd 100644 --- a/cmd/ingester/app/builder/builder.go +++ b/cmd/ingester/app/builder/builder.go @@ -39,6 +39,8 @@ func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWrit unmarshaller = kafka.NewProtobufUnmarshaller() case kafka.EncodingZipkinThrift: unmarshaller = kafka.NewZipkinThriftUnmarshaller() + case kafka.EncodingOtlpProto: + unmarshaller = kafka.NewOtlpProtoUnmarshaller() default: return nil, fmt.Errorf(`encoding '%s' not recognised, use one of ("%s")`, options.Encoding, strings.Join(kafka.AllEncodings, "\", \"")) diff --git a/plugin/storage/kafka/options.go b/plugin/storage/kafka/options.go index 9efdd64784a..1918b85a5c4 100644 --- a/plugin/storage/kafka/options.go +++ b/plugin/storage/kafka/options.go @@ -34,6 +34,7 @@ const ( EncodingProto = "protobuf" // EncodingZipkinThrift is used for spans encoded as Zipkin Thrift. EncodingZipkinThrift = "zipkin-thrift" + EncodingOtlpJSON = "otlp-json" configPrefix = "kafka.producer" suffixBrokers = ".brokers" diff --git a/plugin/storage/kafka/unmarshaller.go b/plugin/storage/kafka/unmarshaller.go index 24161bd3940..eb48ef3a587 100644 --- a/plugin/storage/kafka/unmarshaller.go +++ b/plugin/storage/kafka/unmarshaller.go @@ -20,9 +20,11 @@ import ( "github.com/gogo/protobuf/jsonpb" "github.com/gogo/protobuf/proto" + "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/model/converter/thrift/zipkin" + otlp2jaeger "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" ) // Unmarshaller decodes a byte array to a span @@ -80,3 +82,29 @@ func (h *ZipkinThriftUnmarshaller) Unmarshal(msg []byte) (*model.Span, error) { } return mSpans[0], err } + +type OtlpJSONUnmarshaller struct{} + +func NewOtlpJSONUnmarshaller() *OtlpJSONUnmarshaller { + return &OtlpJSONUnmarshaller{} +} + +func (OtlpJSONUnmarshaller) Unmarshal(buf []byte) (*model.Span, error) { + req := ptraceotlp.NewExportRequest() + err := req.UnmarshalJSON(buf) + if err != nil { + return nil, err + } + + batch, err := otlp2jaeger.ProtoFromTraces(req.Traces()) + if err != nil { + return nil, err + } + + // check span & raise err + // if span { + + // } + + return batch[0].Spans[0], nil +}