Skip to content

Commit

Permalink
feat(pubsub): extract trace information
Browse files Browse the repository at this point in the history
Pass a context that contains trace information extracted from the
incoming message, allowing traces to span from publishers over to
subscribers.

In order for publishers to include tracing information, the pubsub
client should be created with:
```go
client, err := pubsub.NewClientWithConfig(
	ctx,
	cloudrunner.Runtime(ctx).ProjectID,
	&pubsub.ClientConfig{
		EnableOpenTelemetryTracing: true,
	},
```

Depends on googleapis/google-cloud-go#10827
which is included in
[v1.43.0](https://github.com/googleapis/google-cloud-go/releases/tag/pubsub%2Fv1.43.0).
  • Loading branch information
radhus authored and alethenorio committed Jan 28, 2025
1 parent 11e084d commit 601fd50
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 0 deletions.
15 changes: 15 additions & 0 deletions cloudpubsub/httphandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import (
"net/http"
"time"

"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsub/apiv1/pubsubpb"
"go.einride.tech/cloudrunner/cloudrequestlog"
"go.einride.tech/cloudrunner/cloudstatus"
"go.opentelemetry.io/otel/propagation"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
)
Expand Down Expand Up @@ -55,6 +57,19 @@ func (fn httpHandlerFn) ServeHTTP(w http.ResponseWriter, r *http.Request) {
fields.Add(slog.Any("pubsubMessage", &pubsubMessage))
}
ctx := withSubscription(r.Context(), payload.Subscription)
carrier := pubsub.NewMessageCarrierFromPB(&pubsubMessage)
b, err := json.Marshal(carrier)
if err != nil {
slog.WarnContext(ctx, "unable to marshal carrier", slog.Any("error", err))
} else {
slog.InfoContext(
ctx,
"extracted carrier trace from pubsub",
slog.String("trace", string(b)),
slog.Any("attributes", pubsubMessage.Attributes),

Check failure on line 69 in cloudpubsub/httphandler.go

View workflow job for this annotation

GitHub Actions / build

avoid direct access to proto field pubsubMessage.Attributes, use pubsubMessage.GetAttributes() instead (protogetter)
)
}
ctx = propagation.TraceContext{}.Extract(ctx, pubsub.NewMessageCarrierFromPB(&pubsubMessage))
if err := fn(ctx, &pubsubMessage); err != nil {
if fields, ok := cloudrequestlog.GetAdditionalFields(r.Context()); ok {
fields.Add(slog.Any("error", err))
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ require (
cloud.google.com/go v0.117.0 // indirect
cloud.google.com/go/auth v0.13.0 // indirect
cloud.google.com/go/auth/oauth2adapt v0.2.6 // indirect
cloud.google.com/go/iam v1.2.2 // indirect
cloud.google.com/go/longrunning v0.6.2 // indirect
cloud.google.com/go/monitoring v1.21.2 // indirect
cloud.google.com/go/trace v1.11.2 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ cloud.google.com/go/compute/metadata v0.6.0 h1:A6hENjEsCDtC1k8byVsgwvVcioamEHvZ4
cloud.google.com/go/compute/metadata v0.6.0/go.mod h1:FjyFAW1MW0C203CEOMDTu3Dk1FlqW3Rga40jzHL4hfg=
cloud.google.com/go/iam v1.2.2 h1:ozUSofHUGf/F4tCNy/mu9tHLTaxZFLOUiKzjcgWHGIA=
cloud.google.com/go/iam v1.2.2/go.mod h1:0Ys8ccaZHdI1dEUilwzqng/6ps2YB6vRsjIe00/+6JY=
cloud.google.com/go/kms v1.20.1 h1:og29Wv59uf2FVaZlesaiDAqHFzHaoUyHI3HYp9VUHVg=
cloud.google.com/go/kms v1.20.1/go.mod h1:LywpNiVCvzYNJWS9JUcGJSVTNSwPwi0vBAotzDqn2nc=
cloud.google.com/go/logging v1.13.0 h1:7j0HgAp0B94o1YRDqiqm26w4q1rDMH7XNRU34lJXHYc=
cloud.google.com/go/logging v1.13.0/go.mod h1:36CoKh6KA/M0PbhPKMq6/qety2DCAErbhXT62TuXALA=
cloud.google.com/go/longrunning v0.6.2 h1:xjDfh1pQcWPEvnfjZmwjKQEcHnpz6lHjfy7Fo0MK+hc=
Expand Down Expand Up @@ -150,6 +152,8 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0=
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
go.einride.tech/aip v0.68.0 h1:4seM66oLzTpz50u4K1zlJyOXQ3tCzcJN7I22tKkjipw=
go.einride.tech/aip v0.68.0/go.mod h1:7y9FF8VtPWqpxuAxl0KQWqaULxW4zFIesD6zF5RIHHg=
go.einride.tech/protobuf-sensitive v0.8.0 h1:EoeKBbjJFc+K1xvfut6Wat0AY1eMAdmBYmGiBdK8Plw=
go.einride.tech/protobuf-sensitive v0.8.0/go.mod h1:YkjV9z/m+HxFxtvdEUKdjUhImB5QjTD+mzqAqP5pAnU=
go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
Expand Down

0 comments on commit 601fd50

Please sign in to comment.