Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsub): add opentelemetry tracing for publish #8317

Merged
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
c439058
feat(pubsub): add publish tracing
hongalex Jul 20, 2023
217ab44
refactor tests to use InMemoryExporter
hongalex Jul 21, 2023
128c6b1
update copyright year
hongalex Jul 21, 2023
17f41b0
Merge branch 'main' of github.com:googleapis/google-cloud-go into pub…
hongalex Jul 24, 2023
d4a9ef3
run go mod tidy
hongalex Jul 24, 2023
229c80e
rename PubsubMessageCarrier -> MessageCarrier, fix race
hongalex Jul 27, 2023
e1fe160
make messageCarrier not exported
hongalex Aug 3, 2023
c2bee2a
downgrade otel sdk version to support go1.17
hongalex Aug 3, 2023
0f03c49
add short delay to ensure spans are exported
hongalex Aug 3, 2023
47693c4
merge with slush branch
hongalex Aug 4, 2023
247465a
context renaming
hongalex Aug 16, 2023
f3938f2
Merge branch 'main' of github.com:googleapis/google-cloud-go into pub…
hongalex Aug 18, 2023
ecda568
chore(ci): fix yaml syntax (#8448)
noahdietz Aug 18, 2023
aa13d97
chore: release main (#8445)
release-please[bot] Aug 21, 2023
28af108
chore(notebooks): add config to generate apiv2 (#8454)
quartzmo Aug 21, 2023
1859e6c
feat(notebooks): clients for Notebooks API V2 (#8455)
gcf-owl-bot[bot] Aug 21, 2023
7efc8a3
chore(ci): find changes on most recent main commit (#8457)
noahdietz Aug 21, 2023
3cbf84e
chore: add datastore to allowlist of things to generate still (#8458)
codyoss Aug 21, 2023
6b6a69c
chore: release main (#8456)
release-please[bot] Aug 21, 2023
fa6e827
fix(bigquery): value for datasetID on foreign keys (#8447)
alvarowolfx Aug 22, 2023
a9fff18
feat(datastore): SUM and AVG aggregations (#8307)
bhshkh Aug 22, 2023
37681ff
feat(datastore): Support aggregation query in transaction (#8439)
bhshkh Aug 22, 2023
911f31e
chore(main): release datastore 1.14.0 (#8351)
release-please[bot] Aug 22, 2023
ca01559
add helper functions for creating publish spans
hongalex Aug 23, 2023
567ed2d
refactor error tests
hongalex Aug 23, 2023
6c21558
fix(spanner): Transaction was started in a different session (#8467)
olavloite Aug 23, 2023
3dda7b2
feat(spanner/spannertest): support INSERT DML (#7820)
govargo Aug 23, 2023
14aa857
test(datastore): Truncate transaction read time to millisecond (#8473)
bhshkh Aug 23, 2023
6a45f26
feat(auth): add base auth package (#8465)
codyoss Aug 23, 2023
2c65345
test(datastore): Correct read time before creating entities (#8475)
bhshkh Aug 23, 2023
d773366
chore(main): release spanner 1.49.0 (#8468)
release-please[bot] Aug 24, 2023
8c1a363
chore(ci): increase new-client fetch depth (#8483)
noahdietz Aug 24, 2023
3789882
feat(spanner/spansql): add support for bit functions, sequence functi…
Aug 24, 2023
3d8c2c8
test(storage): increase timeout for requester pays test (#8421)
BrennaEpp Aug 24, 2023
236904e
test(pubsublite): fix flaky TestRoutingPublisherUnloadIdlePublisher (…
tmdiep Aug 24, 2023
ee274c7
test(pubsublite): fix flaky TestIntegration_SeekSubscription (#8479)
tmdiep Aug 24, 2023
c90dd00
feat(spanner): allow non-default service accounts (#8488)
ko3a4ok Aug 25, 2023
ccd0205
feat(spanner/spansql): add support for SEQUENCE statements (#8481)
Aug 25, 2023
d2a9d82
chore(config): add config to generate apiv1 (#8490)
julieqiu Aug 25, 2023
20725c8
feat(container): add support for NodeConfig Update (#8461)
gcf-owl-bot[bot] Aug 25, 2023
9874485
feat(config): new clients (#8493)
yoshi-code-bot Aug 25, 2023
d440d75
feat(spanner/spansql): add support for aggregate functions (#8498)
Aug 28, 2023
2fef801
fix exactly once delivery test
hongalex Aug 28, 2023
38a040e
fix(pubsub): make AckWithResult return success when constructed (#8489)
hongalex Aug 28, 2023
098ffa5
pull in changes from main
hongalex Aug 28, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pubsub/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ require (
github.com/google/go-cmp v0.5.9
github.com/googleapis/gax-go/v2 v2.11.0
go.opencensus.io v0.24.0
go.opentelemetry.io/otel v1.15.1
go.opentelemetry.io/otel/sdk v1.14.0
go.opentelemetry.io/otel/trace v1.15.1
golang.org/x/oauth2 v0.8.0
golang.org/x/sync v0.2.0
golang.org/x/time v0.3.0
Expand All @@ -23,6 +26,8 @@ require (
require (
cloud.google.com/go/compute v1.19.3 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/google/s2a-go v0.1.4 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.3 // indirect
Expand Down
15 changes: 15 additions & 0 deletions pubsub/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWH
github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
Expand All @@ -30,6 +31,11 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m
github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ=
github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
Expand Down Expand Up @@ -69,6 +75,7 @@ github.com/googleapis/enterprise-certificate-proxy v0.2.3/go.mod h1:AwSRAtLfXpU5
github.com/googleapis/gax-go/v2 v2.11.0 h1:9V9PWXEsWnPpQhu/PeQIkS4eGzMlTLGgt80cUUI8Ki4=
github.com/googleapis/gax-go/v2 v2.11.0/go.mod h1:DxmR61SGKkGLa2xigwuZIQpkCI2S5iydzRfb3peWZJI=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
Expand All @@ -80,9 +87,16 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo=
go.opentelemetry.io/otel v1.15.1 h1:3Iwq3lfRByPaws0f6bU3naAqOR1n5IeDWd9390kWHa8=
go.opentelemetry.io/otel v1.15.1/go.mod h1:mHHGEHVDLal6YrKMmk9LqC4a3sF5g+fHfrttQIB1NTc=
go.opentelemetry.io/otel/sdk v1.14.0 h1:PDCppFRDq8A1jL9v6KMI6dYesaq+DFcDZvjsoGvxGzY=
go.opentelemetry.io/otel/sdk v1.14.0/go.mod h1:bwIC5TjrNG6QDCHNWvW4HLHtUQ4I+VQDsnjhvyZCALM=
go.opentelemetry.io/otel/trace v1.15.1 h1:uXLo6iHJEzDfrNC0L0mNjItIp06SyaBQxu5t3xMlngY=
go.opentelemetry.io/otel/trace v1.15.1/go.mod h1:IWdQG/5N1x7f6YUlmdLeJvH9yxtuJAfc4VW5Agv9r/8=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
Expand Down Expand Up @@ -198,6 +212,7 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
72 changes: 65 additions & 7 deletions pubsub/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ import (
gax "github.com/googleapis/gax-go/v2"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
otelcodes "go.opentelemetry.io/otel/codes"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
"google.golang.org/api/support/bundler"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -567,6 +572,8 @@ var errTopicOrderingNotEnabled = errors.New("Topic.EnableMessageOrdering=false,
// need to be stopped by calling t.Stop(). Once stopped, future calls to Publish
// will immediately return a PublishResult with an error.
func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult {
opts := getPublishSpanAttributes(t.String(), msg)
ctx, span := tracer().Start(ctx, fmt.Sprintf("%s %s", t.String(), publisherSpanName), opts...)
ctx, err := tag.New(ctx, tag.Insert(keyStatus, "OK"), tag.Upsert(keyTopic, t.name))
if err != nil {
log.Printf("pubsub: cannot create context with tag in Publish: %v", err)
Expand All @@ -575,6 +582,7 @@ func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult {
r := ipubsub.NewPublishResult()
if !t.EnableMessageOrdering && msg.OrderingKey != "" {
ipubsub.SetPublishResult(r, "", errTopicOrderingNotEnabled)
spanRecordError(span, errTopicOrderingNotEnabled)
return r
}

Expand All @@ -585,26 +593,51 @@ func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult {
Attributes: msg.Attributes,
OrderingKey: msg.OrderingKey,
})
span.SetAttributes(semconv.MessagingMessagePayloadSizeBytesKey.Int(msgSize))

t.initBundler()
t.mu.RLock()
defer t.mu.RUnlock()
// TODO(aboulhosn) [from bcmills] consider changing the semantics of bundler to perform this logic so we don't have to do it here
if t.stopped {
ipubsub.SetPublishResult(r, "", ErrTopicStopped)
spanRecordError(span, ErrTopicStopped)
return r
}

ctx2, fcSpan := tracer().Start(ctx, publishFlowControlSpanName)
if err := t.flowController.acquire(ctx, msgSize); err != nil {
t.scheduler.Pause(msg.OrderingKey)
ipubsub.SetPublishResult(r, "", err)
spanRecordError(span, err)
return r
}
err = t.scheduler.Add(msg.OrderingKey, &bundledMessage{msg, r, msgSize}, msgSize)
if err != nil {
fcSpan.End()

_, batchSpan := tracer().Start(ctx2, publishSchedulerSpanName)

bmsg := &bundledMessage{
msg: msg,
res: r,
size: msgSize,
span: span,
batchSpan: batchSpan,
}

if span.SpanContext().IsValid() {
if msg.Attributes == nil {
msg.Attributes = make(map[string]string)
}
// Inject the context from the first publish span rather than from flow control / batching.
otel.GetTextMapPropagator().Inject(ctx, newMessageCarrier(msg))
}

if err := t.scheduler.Add(msg.OrderingKey, bmsg, msgSize); err != nil {
t.scheduler.Pause(msg.OrderingKey)
ipubsub.SetPublishResult(r, "", err)
spanRecordError(span, err)
}

return r
}

Expand Down Expand Up @@ -634,6 +667,10 @@ type bundledMessage struct {
msg *Message
res *PublishResult
size int
// span is the entire publish span (from user calling Publish to the publish RPC resolving).
span trace.Span
// batchSpan traces the message batching operation in publish scheduler.
batchSpan trace.Span
}

func (t *Topic) initBundler() {
Expand Down Expand Up @@ -662,13 +699,17 @@ func (t *Topic) initBundler() {

t.scheduler = scheduler.NewPublishScheduler(workers, func(bundle interface{}) {
// TODO(jba): use a context detached from the one passed to NewClient.
ctx := context.TODO()
ctx2 := context.TODO()
if timeout != 0 {
var cancel func()
ctx, cancel = context.WithTimeout(ctx, timeout)
ctx2, cancel = context.WithTimeout(ctx2, timeout)
defer cancel()
}
t.publishMessageBundle(ctx, bundle.([]*bundledMessage))
bmsgs := bundle.([]*bundledMessage)
for _, m := range bmsgs {
m.batchSpan.End()
}
t.publishMessageBundle(ctx2, bmsgs)
})
t.scheduler.DelayThreshold = t.PublishSettings.DelayThreshold
t.scheduler.BundleCountThreshold = t.PublishSettings.CountThreshold
Expand Down Expand Up @@ -721,17 +762,31 @@ func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage)
if err != nil {
log.Printf("pubsub: cannot create context with tag in publishMessageBundle: %v", err)
}
pbMsgs := make([]*pb.PubsubMessage, len(bms))
numMsgs := len(bms)
pbMsgs := make([]*pb.PubsubMessage, numMsgs)
var orderingKey string
if numMsgs != 0 {
// extract the ordering key for this batch. since
// messages in the same batch share the same ordering
// key, it doesn't matter which we read from.
orderingKey = bms[0].msg.OrderingKey
}
for i, bm := range bms {
orderingKey = bm.msg.OrderingKey
pbMsgs[i] = &pb.PubsubMessage{
Data: bm.msg.Data,
Attributes: bm.msg.Attributes,
OrderingKey: bm.msg.OrderingKey,
}
if bm.msg.Attributes != nil {
ctx = otel.GetTextMapPropagator().Extract(ctx, newMessageCarrier(bm.msg))
}
_, pSpan := tracer().Start(ctx, publishRPCSpanName)
pSpan.SetAttributes(attribute.Int(numBatchedMessagesAttribute, numMsgs))
defer bm.span.End()
defer pSpan.End()
bm.msg = nil // release bm.msg for GC
}

var res *pb.PublishResponse
start := time.Now()
if orderingKey != "" && t.scheduler.IsPaused(orderingKey) {
Expand Down Expand Up @@ -766,8 +821,11 @@ func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage)
t.flowController.release(ctx, bm.size)
if err != nil {
ipubsub.SetPublishResult(bm.res, "", err)
bm.span.RecordError(err)
bm.span.SetStatus(otelcodes.Error, err.Error())
} else {
ipubsub.SetPublishResult(bm.res, res.MessageIds[i], nil)
bm.span.SetAttributes(semconv.MessagingMessageIDKey.String(res.MessageIds[i]))
}
}
}
Expand Down
90 changes: 90 additions & 0 deletions pubsub/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,18 @@ import (
"log"
"sync"

"cloud.google.com/go/pubsub/internal"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
otelcodes "go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/propagation"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
pb "google.golang.org/genproto/googleapis/pubsub/v1"
"google.golang.org/protobuf/proto"
)

// The following keys are used to tag requests with a specific topic/subscription ID.
Expand Down Expand Up @@ -256,3 +265,84 @@ func withSubscriptionKey(ctx context.Context, subName string) context.Context {
func recordStat(ctx context.Context, m *stats.Int64Measure, n int64) {
stats.Record(ctx, m.M(n))
}

const defaultTracerName = "cloud.google.com/go/pubsub"

func tracer() trace.Tracer {
return otel.Tracer(defaultTracerName, trace.WithInstrumentationVersion(internal.Version))
}

var _ propagation.TextMapCarrier = (*messageCarrier)(nil)

// messageCarrier injects and extracts traces from a pubsub.Message.
type messageCarrier struct {
msg *Message
}

// newMessageCarrier creates a new PubsubMessageCarrier.
func newMessageCarrier(msg *Message) messageCarrier {
return messageCarrier{msg: msg}
}

// Get retrieves a single value for a given key.
func (c messageCarrier) Get(key string) string {
return c.msg.Attributes["googclient_"+key]
}

// Set sets an attribute.
func (c messageCarrier) Set(key, val string) {
c.msg.Attributes["googclient_"+key] = val
}

// Keys returns a slice of all keys in the carrier.
func (c messageCarrier) Keys() []string {
i := 0
out := make([]string, len(c.msg.Attributes))
for k := range c.msg.Attributes {
out[i] = k
i++
}
return out
}

const (
// span names
publisherSpanName = "send"
publishFlowControlSpanName = "publisher flow control"
publishSchedulerSpanName = "publish scheduler"
publishRPCSpanName = "send Publish"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The spec doc seems to imply that this should be just "publish"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this should just be Publish, I went back and forth on this for a bit.


// custom pubsub specific attributes
numBatchedMessagesAttribute = "messaging.pubsub.num_messages_in_batch"
orderingAttribute = "messaging.pubsub.ordering_key"
)

func getPublishSpanAttributes(topic string, msg *Message, opts ...attribute.KeyValue) []trace.SpanStartOption {
// TODO(hongalex): benchmark this to make sure no significant performance degradation
// when calculating proto.Size in receive paths.
// TODO(hongalex): find way to incorporate pubsub client library version in attribute.
msgSize := proto.Size(&pb.PubsubMessage{
Data: msg.Data,
Attributes: msg.Attributes,
OrderingKey: msg.OrderingKey,
})
ss := []trace.SpanStartOption{
trace.WithAttributes(
semconv.MessagingSystemKey.String("pubsub"),
semconv.MessagingDestinationKey.String(topic),
semconv.MessagingDestinationKindTopic,
semconv.MessagingMessageIDKey.String(msg.ID),
semconv.MessagingMessagePayloadSizeBytesKey.Int(msgSize),
attribute.String(orderingAttribute, msg.OrderingKey),
),
trace.WithAttributes(opts...),
trace.WithSpanKind(trace.SpanKindProducer),
}
return ss
}

func spanRecordError(span trace.Span, err error) {
span.RecordError(err)
span.SetStatus(otelcodes.Error, err.Error())
span.End()
}
Loading