Skip to content

Commit

Permalink
feat: process message chunks for kafka exporter
Browse files Browse the repository at this point in the history
Signed-off-by: Shivanshu Raj Shrivastava <[email protected]>
  • Loading branch information
shivanshuraj1333 committed Jan 13, 2025
1 parent bc20ac9 commit cfe6488
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 36 deletions.
15 changes: 11 additions & 4 deletions exporter/kafkaexporter/jaeger_marshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,27 @@ import (
)

type jaegerMarshaler struct {
marshaler jaegerSpanMarshaler
marshaler jaegerSpanMarshaler
partitionedByTraceID bool
maxMessageBytes int
}

var _ TracesMarshaler = (*jaegerMarshaler)(nil)

func (j jaegerMarshaler) Marshal(traces ptrace.Traces, topic string) ([]*sarama.ProducerMessage, error) {
func (j jaegerMarshaler) Marshal(traces ptrace.Traces, topic string) ([]*ProducerMessageChunks, error) {
// ToDo: implement partitionedByTraceID

batches := jaeger.ProtoFromTraces(traces)
var messages []*sarama.ProducerMessage

// ToDo: effectively chunk the spans adhering to j.maxMessageBytes
var messageChunks []*ProducerMessageChunks

var errs error
for _, batch := range batches {
for _, span := range batch.Spans {
span.Process = batch.Process
bts, err := j.marshaler.marshal(span)
// continue to process spans that can be serialized
if err != nil {
errs = multierr.Append(errs, err)
continue
Expand All @@ -43,7 +49,8 @@ func (j jaegerMarshaler) Marshal(traces ptrace.Traces, topic string) ([]*sarama.
})
}
}
return messages, errs
messageChunks = append(messageChunks, &ProducerMessageChunks{messages})
return messageChunks, errs
}

func (j jaegerMarshaler) Encoding() string {
Expand Down
4 changes: 2 additions & 2 deletions exporter/kafkaexporter/jaeger_marshaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ func TestJaegerMarshaler(t *testing.T) {
}
for _, test := range tests {
t.Run(test.encoding, func(t *testing.T) {
messages, err := test.unmarshaler.Marshal(td, "topic")
msg, err := test.unmarshaler.Marshal(td, "topic")
require.NoError(t, err)
assert.Equal(t, test.messages, messages)
assert.Equal(t, test.messages, msg[0])
assert.Equal(t, test.encoding, test.unmarshaler.Encoding())
})
}
Expand Down
53 changes: 42 additions & 11 deletions exporter/kafkaexporter/kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"fmt"
"strings"

"github.com/IBM/sarama"
"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -42,20 +43,48 @@ func (ke kafkaErrors) Error() string {
}

func (e *kafkaTracesProducer) tracesPusher(ctx context.Context, td ptrace.Traces) error {
messages, err := e.marshaler.Marshal(td, getTopic(ctx, &e.cfg, td.ResourceSpans()))
messageChunks, err := e.marshaler.Marshal(td, getTopic(ctx, &e.cfg, td.ResourceSpans()))
if err != nil {
return consumererror.NewPermanent(err)
return consumererror.NewPermanent(
fmt.Errorf("failed to marshal trace data: %w", err),
)
}
err = e.producer.SendMessages(messages)
if err != nil {
var prodErr sarama.ProducerErrors
if errors.As(err, &prodErr) {
if len(prodErr) > 0 {
return kafkaErrors{len(prodErr), prodErr[0].Err.Error()}

var allErrors []string

for i, chunk := range messageChunks {
sendErr := e.producer.SendMessages(chunk.msg)
if sendErr == nil {
continue
}

var prodErrs sarama.ProducerErrors
if errors.As(sendErr, &prodErrs) {
for _, pErr := range prodErrs {
allErrors = append(allErrors,
fmt.Sprintf(
"chunk[%d] partition=%d offset=%d error=%v",
i,
pErr.Msg.Partition,
pErr.Msg.Offset,
pErr.Err,
),
)
}
} else {
allErrors = append(allErrors,
fmt.Sprintf("chunk[%d] error=%v", i, sendErr),
)
}
return err
}

if len(allErrors) > 0 {
return fmt.Errorf("encountered %d errors sending Kafka messages: %s",
len(allErrors),
strings.Join(allErrors, "; "),
)
}

return nil
}

Expand All @@ -73,8 +102,10 @@ func (e *kafkaTracesProducer) start(ctx context.Context, host component.Host) er
e.cfg.Encoding,
); errExt == nil {
e.marshaler = &tracesEncodingMarshaler{
marshaler: *marshaler,
encoding: e.cfg.Encoding,
marshaler: *marshaler,
encoding: e.cfg.Encoding,
partitionedByTraceID: e.cfg.PartitionTracesByID,
maxMessageBytes: e.cfg.Producer.MaxMessageBytes,
}
}
if marshaler, errInt := createTracesMarshaler(e.cfg); e.marshaler == nil && errInt == nil {
Expand Down
10 changes: 5 additions & 5 deletions exporter/kafkaexporter/kafka_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func TestTracesPusher(t *testing.T) {

p := kafkaTracesProducer{
producer: producer,
marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, false),
marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, false, 0),
}
t.Cleanup(func() {
require.NoError(t, p.Close(context.Background()))
Expand All @@ -193,7 +193,7 @@ func TestTracesPusher_attr(t *testing.T) {
TopicFromAttribute: "kafka_topic",
},
producer: producer,
marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, false),
marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, false, 0),
}
t.Cleanup(func() {
require.NoError(t, p.Close(context.Background()))
Expand All @@ -209,7 +209,7 @@ func TestTracesPusher_ctx(t *testing.T) {

p := kafkaTracesProducer{
producer: producer,
marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, false),
marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, false, 0),
}
t.Cleanup(func() {
require.NoError(t, p.Close(context.Background()))
Expand All @@ -226,7 +226,7 @@ func TestTracesPusher_err(t *testing.T) {

p := kafkaTracesProducer{
producer: producer,
marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, false),
marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, false, 0),
logger: zap.NewNop(),
}
t.Cleanup(func() {
Expand Down Expand Up @@ -432,7 +432,7 @@ func (e metricsErrorMarshaler) Encoding() string {

var _ TracesMarshaler = (*tracesErrorMarshaler)(nil)

func (e tracesErrorMarshaler) Marshal(_ ptrace.Traces, _ string) ([]*sarama.ProducerMessage, error) {
func (e tracesErrorMarshaler) Marshal(_ ptrace.Traces, _ string) ([]*ProducerMessageChunks, error) {
return nil, e.err
}

Expand Down
34 changes: 24 additions & 10 deletions exporter/kafkaexporter/marshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin/zipkinv2"
)

type ProducerMessageChunks struct {
msg []*sarama.ProducerMessage
}

// TracesMarshaler marshals traces into Message array.
type TracesMarshaler interface {
// Marshal serializes spans into sarama's ProducerMessages
Marshal(traces ptrace.Traces, topic string) ([]*sarama.ProducerMessage, error)
Marshal(traces ptrace.Traces, topic string) ([]*ProducerMessageChunks, error)

// Encoding returns encoding name
Encoding() string
Expand Down Expand Up @@ -45,20 +49,21 @@ type LogsMarshaler interface {
func createTracesMarshaler(config Config) (TracesMarshaler, error) {
encoding := config.Encoding
partitionTracesByID := config.PartitionTracesByID
maxMessageBytes := config.Producer.MaxMessageBytes

jaegerProto := jaegerMarshaler{marshaler: jaegerProtoSpanMarshaler{}}
jaegerJSON := jaegerMarshaler{marshaler: newJaegerJSONMarshaler()}

switch encoding {
case defaultEncoding:
return newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, partitionTracesByID), nil
return newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, partitionTracesByID, maxMessageBytes), nil
case "otlp_json":
return newPdataTracesMarshaler(&ptrace.JSONMarshaler{}, "otlp_json", partitionTracesByID), nil
return newPdataTracesMarshaler(&ptrace.JSONMarshaler{}, "otlp_json", partitionTracesByID, maxMessageBytes), nil
case "zipkin_proto":
return newPdataTracesMarshaler(zipkinv2.NewProtobufTracesMarshaler(), "zipkin_proto", partitionTracesByID), nil
return newPdataTracesMarshaler(zipkinv2.NewProtobufTracesMarshaler(), "zipkin_proto", partitionTracesByID, maxMessageBytes), nil
case "zipkin_json":
return newPdataTracesMarshaler(zipkinv2.NewJSONTracesMarshaler(), "zipkin_json", partitionTracesByID), nil
case jaegerProtoSpanMarshaler{}.encoding():
return newPdataTracesMarshaler(zipkinv2.NewJSONTracesMarshaler(), "zipkin_json", partitionTracesByID, maxMessageBytes), nil
case jaegerProto.Encoding():
return jaegerProto, nil
case jaegerJSON.Encoding():
return jaegerJSON, nil
Expand Down Expand Up @@ -101,12 +106,20 @@ func createLogMarshaler(config Config) (LogsMarshaler, error) {

// tracesEncodingMarshaler is a wrapper around ptrace.Marshaler that implements TracesMarshaler.
type tracesEncodingMarshaler struct {
marshaler ptrace.Marshaler
encoding string
marshaler ptrace.Marshaler
encoding string
partitionedByTraceID bool
maxMessageBytes int
}

func (t *tracesEncodingMarshaler) Marshal(traces ptrace.Traces, topic string) ([]*sarama.ProducerMessage, error) {
func (t *tracesEncodingMarshaler) Marshal(traces ptrace.Traces, topic string) ([]*ProducerMessageChunks, error) {
// ToDo: implement partitionedByTraceID

var messages []*sarama.ProducerMessage

// ToDo: effectively chunk the spans adhering to j.maxMessageBytes
var messageChunks []*ProducerMessageChunks

data, err := t.marshaler.MarshalTraces(traces)
if err != nil {
return nil, fmt.Errorf("failed to marshal traces: %w", err)
Expand All @@ -115,7 +128,8 @@ func (t *tracesEncodingMarshaler) Marshal(traces ptrace.Traces, topic string) ([
Topic: topic,
Value: sarama.ByteEncoder(data),
})
return messages, nil
messageChunks = append(messageChunks, &ProducerMessageChunks{msg: messages})
return messageChunks, nil
}

func (t *tracesEncodingMarshaler) Encoding() string {
Expand Down
2 changes: 1 addition & 1 deletion exporter/kafkaexporter/marshaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ func TestOTLPTracesJsonMarshaling(t *testing.T) {
require.NoError(t, err, "Must have marshaled the data without error")
require.Len(t, msg, test.numExpectedMessages, "Expected number of messages in the message")

for idx, singleMsg := range msg {
for idx, singleMsg := range msg[0].msg {
data, err := singleMsg.Value.Encode()
require.NoError(t, err, "Must not error when encoding value")
require.NotNil(t, data, "Must have valid data to test")
Expand Down
14 changes: 11 additions & 3 deletions exporter/kafkaexporter/pdata_marshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,14 @@ type pdataTracesMarshaler struct {
marshaler ptrace.Marshaler
encoding string
partitionedByTraceID bool
maxMessageBytes int
}

func (p *pdataTracesMarshaler) Marshal(td ptrace.Traces, topic string) ([]*sarama.ProducerMessage, error) {
func (p *pdataTracesMarshaler) Marshal(td ptrace.Traces, topic string) ([]*ProducerMessageChunks, error) {

// ToDo: effectively chunk the spans adhering to j.maxMessageBytes
var messageChunks []*ProducerMessageChunks

var msgs []*sarama.ProducerMessage
if p.partitionedByTraceID {
for _, trace := range batchpersignal.SplitTraces(td) {
Expand All @@ -152,17 +157,20 @@ func (p *pdataTracesMarshaler) Marshal(td ptrace.Traces, topic string) ([]*saram
})
}

return msgs, nil
messageChunks = append(messageChunks, &ProducerMessageChunks{msg: msgs})

return messageChunks, nil
}

func (p *pdataTracesMarshaler) Encoding() string {
return p.encoding
}

func newPdataTracesMarshaler(marshaler ptrace.Marshaler, encoding string, partitionedByTraceID bool) TracesMarshaler {
func newPdataTracesMarshaler(marshaler ptrace.Marshaler, encoding string, partitionedByTraceID bool, maxMessageBytes int) TracesMarshaler {
return &pdataTracesMarshaler{
marshaler: marshaler,
encoding: encoding,
partitionedByTraceID: partitionedByTraceID,
maxMessageBytes: maxMessageBytes,
}
}

0 comments on commit cfe6488

Please sign in to comment.