From b1dead750d982658927d8aa78d85ae4c058f7945 Mon Sep 17 00:00:00 2001 From: Lam Tran Date: Wed, 21 Feb 2024 21:38:07 +0700 Subject: [PATCH 1/2] chore(checkoutservice): add producer interceptor for tracing --- .../kafka/trace_interceptor.go | 1 + src/checkoutservice/kafka/producer.go | 1 + .../kafka/trace_interceptor.go | 60 +++++++++++++++++++ src/checkoutservice/main.go | 35 +---------- 4 files changed, 64 insertions(+), 33 deletions(-) create mode 100644 src/checkoutservice/kafka/trace_interceptor.go diff --git a/src/accountingservice/kafka/trace_interceptor.go b/src/accountingservice/kafka/trace_interceptor.go index 16899a5891..5e2bf1254e 100644 --- a/src/accountingservice/kafka/trace_interceptor.go +++ b/src/accountingservice/kafka/trace_interceptor.go @@ -28,6 +28,7 @@ func NewOTelInterceptor(groupID string) *OTelInterceptor { oi.fixedAttrs = []attribute.KeyValue{ semconv.MessagingSystemKafka, + semconv.MessagingOperationReceive, semconv.MessagingKafkaConsumerGroup(groupID), semconv.NetworkTransportTCP, } diff --git a/src/checkoutservice/kafka/producer.go b/src/checkoutservice/kafka/producer.go index 6ee773d8c2..0d9ea5684a 100644 --- a/src/checkoutservice/kafka/producer.go +++ b/src/checkoutservice/kafka/producer.go @@ -17,6 +17,7 @@ func CreateKafkaProducer(brokers []string, log *logrus.Logger) (sarama.AsyncProd saramaConfig.Version = ProtocolVersion // So we can know the partition and offset of messages. saramaConfig.Producer.Return.Successes = true + saramaConfig.Producer.Interceptors = []sarama.ProducerInterceptor{NewOTelInterceptor()} producer, err := sarama.NewAsyncProducer(brokers, saramaConfig) if err != nil { diff --git a/src/checkoutservice/kafka/trace_interceptor.go b/src/checkoutservice/kafka/trace_interceptor.go new file mode 100644 index 0000000000..63b4c3cdcc --- /dev/null +++ b/src/checkoutservice/kafka/trace_interceptor.go @@ -0,0 +1,60 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 +package kafka + +import ( + "context" + "fmt" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/propagation" + semconv "go.opentelemetry.io/otel/semconv/v1.24.0" + "go.opentelemetry.io/otel/trace" + + "github.com/IBM/sarama" +) + +type OTelInterceptor struct { + tracer trace.Tracer + fixedAttrs []attribute.KeyValue +} + +// NewOTelInterceptor processes span for intercepted messages and add some +// headers with the span data. +func NewOTelInterceptor() *OTelInterceptor { + oi := OTelInterceptor{} + oi.tracer = otel.Tracer("github.com/open-telemetry/opentelemetry-demo/checkoutservice/sarama") + + oi.fixedAttrs = []attribute.KeyValue{ + semconv.MessagingSystemKafka, + semconv.MessagingOperationPublish, + semconv.NetworkTransportTCP, + } + return &oi +} + +func (oi *OTelInterceptor) OnSend(msg *sarama.ProducerMessage) { + spanContext, span := oi.tracer.Start( + context.Background(), + fmt.Sprintf("%s publish", msg.Topic), + trace.WithSpanKind(trace.SpanKindProducer), + trace.WithAttributes( + semconv.PeerService("kafka"), + semconv.NetworkTransportTCP, + semconv.MessagingSystemKafka, + semconv.MessagingDestinationName(msg.Topic), + semconv.MessagingOperationPublish, + semconv.MessagingKafkaDestinationPartition(int(msg.Partition)), + ), + ) + defer span.End() + + carrier := propagation.MapCarrier{} + propagator := otel.GetTextMapPropagator() + propagator.Inject(spanContext, carrier) + + for key, value := range carrier { + msg.Headers = append(msg.Headers, sarama.RecordHeader{Key: []byte(key), Value: []byte(value)}) + } +} diff --git a/src/checkoutservice/main.go b/src/checkoutservice/main.go index 687b7e307b..35119e4eb3 100644 --- a/src/checkoutservice/main.go +++ b/src/checkoutservice/main.go @@ -7,7 +7,6 @@ import ( "context" "encoding/json" "fmt" - semconv "go.opentelemetry.io/otel/semconv/v1.24.0" "net" "net/http" "os" @@ -311,7 +310,7 @@ func (cs *checkoutService) PlaceOrder(ctx context.Context, req *pb.PlaceOrderReq // send to kafka only if kafka broker address is set if cs.kafkaBrokerSvcAddr != "" { - cs.sendToPostProcessor(ctx, orderResult) + cs.sendToPostProcessor(orderResult) } resp := &pb.PlaceOrderResponse{Order: orderResult} @@ -474,7 +473,7 @@ func (cs *checkoutService) shipOrder(ctx context.Context, address *pb.Address, i return resp.GetTrackingId(), nil } -func (cs *checkoutService) sendToPostProcessor(ctx context.Context, result *pb.OrderResult) { +func (cs *checkoutService) sendToPostProcessor(result *pb.OrderResult) { message, err := proto.Marshal(result) if err != nil { log.Errorf("Failed to marshal message to protobuf: %+v", err) @@ -486,37 +485,7 @@ func (cs *checkoutService) sendToPostProcessor(ctx context.Context, result *pb.O Value: sarama.ByteEncoder(message), } - // Inject tracing info into message - span := createProducerSpan(ctx, &msg) - defer span.End() - cs.KafkaProducerClient.Input() <- &msg successMsg := <-cs.KafkaProducerClient.Successes() log.Infof("Successful to write message. offset: %v", successMsg.Offset) } - -func createProducerSpan(ctx context.Context, msg *sarama.ProducerMessage) trace.Span { - spanContext, span := tracer.Start( - ctx, - fmt.Sprintf("%s publish", msg.Topic), - trace.WithSpanKind(trace.SpanKindProducer), - trace.WithAttributes( - semconv.PeerService("kafka"), - semconv.NetworkTransportTCP, - semconv.MessagingSystemKafka, - semconv.MessagingDestinationName(msg.Topic), - semconv.MessagingOperationPublish, - semconv.MessagingKafkaDestinationPartition(int(msg.Partition)), - ), - ) - - carrier := propagation.MapCarrier{} - propagator := otel.GetTextMapPropagator() - propagator.Inject(spanContext, carrier) - - for key, value := range carrier { - msg.Headers = append(msg.Headers, sarama.RecordHeader{Key: []byte(key), Value: []byte(value)}) - } - - return span -} From 61133b58d7f3d13853e365246592593a8149e809 Mon Sep 17 00:00:00 2001 From: Lam Tran Date: Wed, 21 Feb 2024 23:03:09 +0700 Subject: [PATCH 2/2] chore(checkoutservice): update changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 226ac01d5d..1885146d48 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ the release. ## Unreleased +* [checkoutservice] add producer interceptor for tracing + ([#1400](https://github.com/open-telemetry/opentelemetry-demo/pull/1400)) * [chore] increase memory for Collector and Jaeger ([#1396](https://github.com/open-telemetry/opentelemetry-demo/pull/1396)) * [chore] fix Make targets for restart and redeploy