Skip to content

Commit

Permalink
add project attribute, check span recording
Browse files Browse the repository at this point in the history
  • Loading branch information
hongalex committed Jun 25, 2024
1 parent 249e8ce commit b77a0f5
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 86 deletions.
96 changes: 48 additions & 48 deletions pubsub/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type messageIterator struct {
po *pullOptions
ps *pullStream
subc *vkit.SubscriberClient
projectID string
subID string
subName string
kaTick <-chan time.Time // keep-alive (deadline extensions)
Expand Down Expand Up @@ -145,14 +146,15 @@ func newMessageIterator(subc *vkit.SubscriberClient, subName string, po *pullOpt
cctx, cancel := context.WithCancel(context.Background())
cctx = withSubscriptionKey(cctx, subName)

subID := getIDFromFQN(subName)
projectID, subID := parseResourceName(subName)

it := &messageIterator{
ctx: cctx,
cancel: cancel,
ps: ps,
po: po,
subc: subc,
projectID: projectID,
subID: subID,
subName: subName,
kaTick: time.After(keepAlivePeriod),
Expand Down Expand Up @@ -331,15 +333,17 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) {
if m.Attributes != nil {
ctx = propagation.TraceContext{}.Extract(ctx, newMessageCarrier(m))
}
attr := getSubscriberOpts(it.subID, m)
attr := getSubscriberOpts(it.projectID, it.subID, m)
_, span := startSpan(ctx, subscribeSpanName, it.subID, attr...)
span.SetAttributes(
attribute.Bool(eosAttribute, it.enableExactlyOnceDelivery),
attribute.String(ackIDAttribute, ackID),
semconv.MessagingBatchMessageCount(len(msgs)),
semconv.CodeFunction("receive"),
)
it.activeSpans.Store(ackID, span)
if span.SpanContext().IsSampled() {
it.activeSpans.Store(ackID, span)
}
}
}
deadline := it.ackDeadline()
Expand Down Expand Up @@ -560,13 +564,13 @@ func (it *messageIterator) handleKeepAlives() {
delete(it.keepAliveDeadlines, id)
if it.enableTracing {
// get the parent span context for this ackID for otel tracing.
// This message is now expired, so if the ackID is still valid,
// mark that span as expired and end the span.
s, ok := it.activeSpans.LoadAndDelete(id)
if ok {
span := s.(trace.Span)
span.SetAttributes(attribute.String(resultAttribute, resultExpired))
span.End()
} else {
log.Printf("pubsub: handleKeepAlives failed to load ackID(%s) from activeSpans map", id)
}
}
} else {
Expand Down Expand Up @@ -639,35 +643,33 @@ func (it *messageIterator) sendAck(m map[string]*AckResult) {
s, ok := it.activeSpans.LoadAndDelete(ackID)
if ok {
subscribeSpan := s.(trace.Span)
if subscribeSpan.SpanContext().IsSampled() {
defer subscribeSpan.End()
defer subscribeSpan.SetAttributes(attribute.String(resultAttribute, resultAcked))
subscribeSpans = append(subscribeSpans, subscribeSpan)
subscribeSpan.AddEvent(eventAckStart, trace.WithAttributes(semconv.MessagingBatchMessageCount(len(ackIDs))))
defer subscribeSpan.AddEvent(eventAckEnd)
links = append(links, trace.Link{SpanContext: subscribeSpan.SpanContext()})
}
} else {
log.Printf("pubsub: sendAck failed to load ackID(%s) from activeSpans map", ackID)
defer subscribeSpan.End()
defer subscribeSpan.SetAttributes(attribute.String(resultAttribute, resultAcked))
subscribeSpans = append(subscribeSpans, subscribeSpan)
subscribeSpan.AddEvent(eventAckStart, trace.WithAttributes(semconv.MessagingBatchMessageCount(len(ackIDs))))
defer subscribeSpan.AddEvent(eventAckEnd)
links = append(links, trace.Link{SpanContext: subscribeSpan.SpanContext()})
}
}
}
var ackSpan trace.Span
if it.enableTracing {
opts := getCommonOptions(it.subID)
opts := getCommonOptions(it.projectID, it.subID)
opts = append(opts, trace.WithLinks(links...))
ctx, ackSpan = startSpan(context.Background(), ackSpanName, it.subID, opts...)
defer ackSpan.End()
ackSpan.SetAttributes(semconv.MessagingBatchMessageCount(len(ackIDs)),
semconv.CodeFunction("sendAck"))

for _, s := range subscribeSpans {
s.AddLink(trace.Link{
SpanContext: ackSpan.SpanContext(),
Attributes: []attribute.KeyValue{
semconv.MessagingOperationName(ackSpanName),
},
})
if ackSpan.SpanContext().IsSampled() {
for _, s := range subscribeSpans {
s.AddLink(trace.Link{
SpanContext: ackSpan.SpanContext(),
Attributes: []attribute.KeyValue{
semconv.MessagingOperationName(ackSpanName),
},
})
}
}
}
return it.subc.Acknowledge(ctx, &pb.AcknowledgeRequest{
Expand Down Expand Up @@ -701,8 +703,8 @@ func (it *messageIterator) sendModAck(m map[string]*AckResult, deadline time.Dur
it.sendAckWithFunc(m, func(ctx context.Context, subName string, ackIDs []string) error {
links := make([]trace.Link, 0, len(ackIDs))
subscribeSpans := make([]trace.Span, 0, len(ackIDs))
for _, ackID := range ackIDs {
if it.enableTracing {
if it.enableTracing {
for _, ackID := range ackIDs {
// get the parent span context for this ackID for otel tracing.
var s any
var ok bool
Expand All @@ -711,44 +713,42 @@ func (it *messageIterator) sendModAck(m map[string]*AckResult, deadline time.Dur
} else {
s, ok = it.activeSpans.Load(ackID)
}
if !ok {
// This should never happen since this means the ackID was dropped early.
log.Printf("pubsub: sendModAck failed to load ackID(%s) from activeSpans map", ackID)
continue
}
subscribeSpan := s.(trace.Span)
subscribeSpans = append(subscribeSpans, subscribeSpan)
if isNack {
defer subscribeSpan.End()
defer subscribeSpan.SetAttributes(attribute.String(resultAttribute, resultNacked))
}
subscribeSpan.AddEvent(eventStart, trace.WithAttributes(semconv.MessagingBatchMessageCount(len(ackIDs))))
defer subscribeSpan.AddEvent(eventEnd)
if subscribeSpan.IsRecording() {
if ok {
subscribeSpan := s.(trace.Span)
subscribeSpans = append(subscribeSpans, subscribeSpan)
if isNack {
defer subscribeSpan.End()
defer subscribeSpan.SetAttributes(attribute.String(resultAttribute, resultNacked))
}
subscribeSpan.AddEvent(eventStart, trace.WithAttributes(semconv.MessagingBatchMessageCount(len(ackIDs))))
defer subscribeSpan.AddEvent(eventEnd)
links = append(links, trace.Link{SpanContext: subscribeSpan.SpanContext()})
}
}
}
var mSpan trace.Span
if it.enableTracing {
opts := getCommonOptions(it.subID)
opts := getCommonOptions(it.projectID, it.subID)
opts = append(opts, trace.WithLinks(links...))
ctx, mSpan = startSpan(context.Background(), modackSpanName, it.subID, opts...)
defer mSpan.End()
if !isNack {
mSpan.SetAttributes(
attribute.Int(ackDeadlineSecAttribute, int(deadlineSec)),
semconv.MessagingGCPPubsubMessageAckDeadline(int(deadlineSec)),
attribute.Bool(receiptModackAttribute, isReceipt))
}
mSpan.SetAttributes(semconv.MessagingBatchMessageCount(len(ackIDs)),
semconv.CodeFunction("sendModAck"))
for _, s := range subscribeSpans {
s.AddLink(trace.Link{
SpanContext: mSpan.SpanContext(),
Attributes: []attribute.KeyValue{
semconv.MessagingOperationName(modackSpanName),
},
})

if mSpan.SpanContext().IsSampled() {
for _, s := range subscribeSpans {
s.AddLink(trace.Link{
SpanContext: mSpan.SpanContext(),
Attributes: []attribute.KeyValue{
semconv.MessagingOperationName(modackSpanName),
},
})
}
}
}

Expand Down
12 changes: 8 additions & 4 deletions pubsub/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,13 @@ func newExactlyOnceBackoff() gax.Backoff {
}
}

// retrieves the resource ID from a fully qualified name.
// For example, "projects/p/topics/my-topic" -> "my-topic"
func getIDFromFQN(fqn string) string {
// parseResourceName parses the project and resource ID from a fully qualified name.
// For example, "projects/p/topics/my-topic" -> "p", "my-topic"
func parseResourceName(fqn string) (string, string) {
s := strings.Split(fqn, "/")
return s[len(s)-1]
// Some tests don't use FQN, in which case return empty projectID.
if len(s) == 1 {
return "", s[0]
}
return s[1], s[len(s)-1]
}
13 changes: 4 additions & 9 deletions pubsub/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"errors"
"fmt"
"io"
"log"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -57,7 +56,7 @@ type Subscription struct {

// enableTracing enable OTel tracing of Pub/Sub messages on this subscription.
// This is configured at client instantiation, and allows
// dsabling of tracing even when a tracer provider is detectd.
// dsabling of tracing even when a tracer provider is detected.
enableTracing bool
}

Expand Down Expand Up @@ -1406,10 +1405,8 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
if ok {
sc := c.(trace.Span)
otelCtx = trace.ContextWithSpanContext(otelCtx, sc.SpanContext())
} else {
log.Printf("pubsub: subscriber concurrency control failed to load ackID(%s) from activeSpans map", ackh.ackID)
_, ccSpan = startSpan(otelCtx, ccSpanName, "")
}
_, ccSpan = startSpan(otelCtx, ccSpanName, "")
}
// Use the original user defined ctx for this operation so the acquire operation can be cancelled.
if err := fc.acquire(ctx, len(msg.Data)); err != nil {
Expand All @@ -1420,7 +1417,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
// Return nil if the context is done, not err.
return nil
}
if iter.enableTracing {
if iter.enableTracing && ccSpan.IsRecording() {
ccSpan.End()
}

Expand All @@ -1442,12 +1439,10 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
msgLen := len(msg.Data)
if err := sched.Add(key, msg, func(msg interface{}) {
m := msg.(*Message)
if iter.enableTracing {
schedulerSpan.End()
}
defer wg.Done()
var ps trace.Span
if iter.enableTracing {
schedulerSpan.End()
otelCtx, ps = startSpan(otelCtx, processSpanName, s.ID())
old := ackh.doneFunc
ackh.doneFunc = func(ackID string, ack bool, r *ipubsub.AckResult, receiveTime time.Time) {
Expand Down
23 changes: 13 additions & 10 deletions pubsub/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,8 @@ var errTopicOrderingNotEnabled = errors.New("Topic.EnableMessageOrdering=false,
func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult {
var createSpan trace.Span
if t.enableTracing {
ctx, createSpan = startCreateSpan(ctx, msg, t.ID())
opts := getPublishSpanAttributes(t.c.projectID, t.ID(), msg)
ctx, createSpan = startSpan(ctx, createSpanName, t.ID(), opts...)
createSpan.SetAttributes(semconv.CodeFunction("Publish"))
}
ctx, err := tag.New(ctx, tag.Insert(keyStatus, "OK"), tag.Upsert(keyTopic, t.name))
Expand Down Expand Up @@ -965,23 +966,25 @@ func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage)
links = append(links, trace.Link{SpanContext: bm.createSpan.SpanContext()})
}

topicID := getIDFromFQN(t.name)
projectID, topicID := parseResourceName(t.name)
var pSpan trace.Span
opts := getCommonOptions(topicID)
opts := getCommonOptions(projectID, topicID)
// Add link to publish RPC span of createSpan(s).
opts = append(opts, trace.WithLinks(links...))
ctx, pSpan = startSpan(ctx, publishRPCSpanName, topicID, opts...)
pSpan.SetAttributes(semconv.MessagingBatchMessageCount(numMsgs), semconv.CodeFunction("publishMessageBundle"))
defer pSpan.End()

// Add the reverse link to createSpan(s) of publish RPC span.
for _, bm := range bms {
bm.createSpan.AddLink(trace.Link{
SpanContext: pSpan.SpanContext(),
Attributes: []attribute.KeyValue{
semconv.MessagingOperationName(publishRPCSpanName),
},
})
if pSpan.SpanContext().IsSampled() {
for _, bm := range bms {
bm.createSpan.AddLink(trace.Link{
SpanContext: pSpan.SpanContext(),
Attributes: []attribute.KeyValue{
semconv.MessagingOperationName(publishRPCSpanName),
},
})
}
}
}
var batchSize int
Expand Down
22 changes: 8 additions & 14 deletions pubsub/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,6 @@ func injectPropagation(ctx context.Context, msg *Message) {
}

const (
pubsubSemConvName = "gcp_pubsub"

// publish span names
createSpanName = "create"
publishFCSpanName = "publisher flow control"
Expand Down Expand Up @@ -352,13 +350,13 @@ const (
resultExpired = "expired"

// custom pubsub specific attributes
gcpProjectIDAttribute = "gcp.project_id"
pubsubPrefix = "messaging.gcp_pubsub."
orderingAttribute = pubsubPrefix + "message.ordering_key"
deliveryAttemptAttribute = pubsubPrefix + "message.delivery_attempt"
eosAttribute = pubsubPrefix + "exactly_once_delivery"
ackIDAttribute = pubsubPrefix + "message.ack_id"
resultAttribute = pubsubPrefix + "result"
ackDeadlineSecAttribute = pubsubPrefix + "ack_deadline_seconds"
receiptModackAttribute = pubsubPrefix + "is_receipt_modack"
)

Expand All @@ -370,12 +368,7 @@ func startSpan(ctx context.Context, spanType, resourceID string, opts ...trace.S
return tracer().Start(ctx, spanName, opts...)
}

func startCreateSpan(ctx context.Context, m *Message, topicID string) (context.Context, trace.Span) {
opts := getPublishSpanAttributes(topicID, m)
return tracer().Start(ctx, fmt.Sprintf("%s %s", topicID, createSpanName), opts...)
}

func getPublishSpanAttributes(dst string, msg *Message, attrs ...attribute.KeyValue) []trace.SpanStartOption {
func getPublishSpanAttributes(project, dst string, msg *Message, attrs ...attribute.KeyValue) []trace.SpanStartOption {
opts := []trace.SpanStartOption{
trace.WithAttributes(
semconv.MessagingMessageID(msg.ID),
Expand All @@ -385,11 +378,11 @@ func getPublishSpanAttributes(dst string, msg *Message, attrs ...attribute.KeyVa
trace.WithAttributes(attrs...),
trace.WithSpanKind(trace.SpanKindProducer),
}
opts = append(opts, getCommonOptions(dst)...)
opts = append(opts, getCommonOptions(project, dst)...)
return opts
}

func getSubscriberOpts(dst string, msg *Message, attrs ...attribute.KeyValue) []trace.SpanStartOption {
func getSubscriberOpts(project, dst string, msg *Message, attrs ...attribute.KeyValue) []trace.SpanStartOption {
opts := []trace.SpanStartOption{
trace.WithAttributes(
semconv.MessagingMessageID(msg.ID),
Expand All @@ -402,14 +395,15 @@ func getSubscriberOpts(dst string, msg *Message, attrs ...attribute.KeyValue) []
if msg.DeliveryAttempt != nil {
opts = append(opts, trace.WithAttributes(attribute.Int(deliveryAttemptAttribute, *msg.DeliveryAttempt)))
}
opts = append(opts, getCommonOptions(dst)...)
opts = append(opts, getCommonOptions(project, dst)...)
return opts
}

func getCommonOptions(destination string) []trace.SpanStartOption {
func getCommonOptions(projectID, destination string) []trace.SpanStartOption {
opts := []trace.SpanStartOption{
trace.WithAttributes(
semconv.MessagingSystemKey.String(pubsubSemConvName),
attribute.String(gcpProjectIDAttribute, projectID),
semconv.MessagingSystemGCPPubsub,
semconv.MessagingDestinationName(destination),
),
}
Expand Down
Loading

0 comments on commit b77a0f5

Please sign in to comment.