Skip to content

Commit 505e8d6

Browse files
committed
feat(pubsub): extract trace information
Pass a context that contains trace information extracted from the incoming message, allowing traces to span from publishers over to subscribers. In order for publishers to include tracing information, the pubsub client should be created with: ```go client, err := pubsub.NewClientWithConfig( ctx, cloudrunner.Runtime(ctx).ProjectID, &pubsub.ClientConfig{ EnableOpenTelemetryTracing: true, }, ``` Depends on googleapis/google-cloud-go#10827 which is included in [v1.43.0](https://github.com/googleapis/google-cloud-go/releases/tag/pubsub%2Fv1.43.0).
1 parent 5dae0fa commit 505e8d6

File tree

13 files changed

+172
-8
lines changed

13 files changed

+172
-8
lines changed

.sage/main.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,10 @@ func GoTest(ctx context.Context) error {
5151

5252
func GoLint(ctx context.Context) error {
5353
sg.Logger(ctx).Println("linting Go files...")
54-
return sggolangcilint.Run(ctx)
54+
// It is currently not possible to ignore package deprecation lint errors so we ignore
55+
// it through flags and match the lint error string.
56+
// See https://github.com/golangci/golangci-lint/issues/741#issuecomment-1721737130 for more details.
57+
return sggolangcilint.Run(ctx, "--exclude", `SA1019: .go.einride.tech/cloudrunner/cloudtrace. is deprecated:`)
5558
}
5659

5760
func GoLintFix(ctx context.Context) error {

cloudotel/traceidhook.go

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package cloudotel
2+
3+
import (
4+
"context"
5+
"log/slog"
6+
7+
"go.einride.tech/cloudrunner/cloudslog"
8+
"go.opentelemetry.io/otel/trace"
9+
)
10+
11+
// IDKey is the log entry key for trace IDs.
12+
// Experimental: May be removed in a future update.
13+
const IDKey = "traceId"
14+
15+
// TraceIDHook adds the trace ID (without the full trace resource name) to the request logger.
16+
// The trace ID can be used to filter on logs for the same trace across multiple projects.
17+
// Experimental: May be removed in a future update.
18+
func TraceIDHook(ctx context.Context, traceContext trace.SpanContext) context.Context {
19+
return cloudslog.With(ctx, slog.String(IDKey, traceContext.TraceID().String()))
20+
}

cloudotel/tracemiddleware.go

+99
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package cloudotel
2+
3+
import (
4+
"context"
5+
"net/http"
6+
7+
gcppropagator "github.com/GoogleCloudPlatform/opentelemetry-operations-go/propagator"
8+
"go.einride.tech/cloudrunner/cloudstream"
9+
"go.einride.tech/cloudrunner/cloudzap"
10+
"go.opentelemetry.io/otel/propagation"
11+
"go.opentelemetry.io/otel/trace"
12+
"go.uber.org/zap"
13+
"google.golang.org/grpc"
14+
"google.golang.org/grpc/metadata"
15+
)
16+
17+
type TraceHook func(context.Context, trace.SpanContext) context.Context
18+
19+
// TraceMiddleware that ensures incoming traces are forwarded and included in logging.
20+
type TraceMiddleware struct {
21+
// ProjectID of the project the service is running in.
22+
ProjectID string
23+
// TraceHook is an optional callback that gets called with the parsed trace context.
24+
TraceHook TraceHook
25+
// propagator is a opentelemetry trace propagator
26+
propagator propagation.TextMapPropagator
27+
}
28+
29+
func NewTraceMiddleware() TraceMiddleware {
30+
propagator := propagation.NewCompositeTextMapPropagator(
31+
gcppropagator.CloudTraceFormatPropagator{},
32+
propagation.TraceContext{},
33+
propagation.Baggage{},
34+
)
35+
return TraceMiddleware{
36+
TraceHook: TraceIDHook,
37+
propagator: propagator,
38+
}
39+
}
40+
41+
// GRPCServerUnaryInterceptor provides unary RPC middleware for gRPC servers.
42+
func (i *TraceMiddleware) GRPCServerUnaryInterceptor(
43+
ctx context.Context,
44+
req interface{},
45+
_ *grpc.UnaryServerInfo,
46+
handler grpc.UnaryHandler,
47+
) (resp interface{}, err error) {
48+
md, ok := metadata.FromIncomingContext(ctx)
49+
if !ok {
50+
return handler(ctx, req)
51+
}
52+
carrier := propagation.HeaderCarrier(md)
53+
ctx = i.propagator.Extract(ctx, carrier)
54+
ctx = i.withLogTracing(ctx, trace.SpanContextFromContext(ctx))
55+
return handler(ctx, req)
56+
}
57+
58+
// GRPCStreamServerInterceptor adds tracing metadata to streaming RPCs.
59+
func (i *TraceMiddleware) GRPCStreamServerInterceptor(
60+
srv interface{},
61+
ss grpc.ServerStream,
62+
_ *grpc.StreamServerInfo,
63+
handler grpc.StreamHandler,
64+
) (err error) {
65+
md, ok := metadata.FromIncomingContext(ss.Context())
66+
if !ok {
67+
return handler(srv, ss)
68+
}
69+
ctx := ss.Context()
70+
carrier := propagation.HeaderCarrier(md)
71+
ctx = i.propagator.Extract(ctx, carrier)
72+
ctx = i.withLogTracing(ctx, trace.SpanContextFromContext(ctx))
73+
return handler(srv, cloudstream.NewContextualServerStream(ctx, ss))
74+
}
75+
76+
// HTTPServer provides middleware for HTTP servers.
77+
func (i *TraceMiddleware) HTTPServer(next http.Handler) http.Handler {
78+
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
79+
carrier := propagation.HeaderCarrier(r.Header)
80+
ctx := i.propagator.Extract(r.Context(), carrier)
81+
ctx = i.withLogTracing(ctx, trace.SpanContextFromContext(ctx))
82+
next.ServeHTTP(w, r.WithContext(ctx))
83+
})
84+
}
85+
86+
func (i *TraceMiddleware) withLogTracing(ctx context.Context, spanCtx trace.SpanContext) context.Context {
87+
if i.TraceHook != nil {
88+
ctx = i.TraceHook(ctx, spanCtx)
89+
}
90+
fields := make([]zap.Field, 0, 3)
91+
fields = append(fields, cloudzap.Trace(i.ProjectID, spanCtx.TraceID().String()))
92+
if spanCtx.SpanID().String() != "" {
93+
fields = append(fields, cloudzap.SpanID(spanCtx.SpanID().String()))
94+
}
95+
if spanCtx.IsSampled() {
96+
fields = append(fields, cloudzap.TraceSampled(spanCtx.IsSampled()))
97+
}
98+
return cloudzap.WithLoggerFields(ctx, fields...)
99+
}

cloudpubsub/httphandler.go

+6
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@ import (
77
"net/http"
88
"time"
99

10+
"cloud.google.com/go/pubsub"
1011
"cloud.google.com/go/pubsub/apiv1/pubsubpb"
1112
"go.einride.tech/cloudrunner/cloudrequestlog"
1213
"go.einride.tech/cloudrunner/cloudstatus"
14+
"go.opentelemetry.io/otel/propagation"
1315
"google.golang.org/grpc/status"
1416
"google.golang.org/protobuf/types/known/timestamppb"
1517
)
@@ -55,6 +57,10 @@ func (fn httpHandlerFn) ServeHTTP(w http.ResponseWriter, r *http.Request) {
5557
fields.Add(slog.Any("pubsubMessage", &pubsubMessage))
5658
}
5759
ctx := withSubscription(r.Context(), payload.Subscription)
60+
tc := propagation.TraceContext{}
61+
ctx = tc.Extract(ctx, pubsub.NewMessageCarrierFromPB(&pubsubMessage))
62+
carrier := make(propagation.MapCarrier)
63+
tc.Inject(ctx, &carrier)
5864
if err := fn(ctx, &pubsubMessage); err != nil {
5965
if fields, ok := cloudrequestlog.GetAdditionalFields(r.Context()); ok {
6066
fields.Add(slog.Any("error", err))

cloudtesting/trace.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@ package cloudtesting
33
import (
44
"context"
55

6-
"go.einride.tech/cloudrunner/cloudtrace"
6+
cloudtrace "go.einride.tech/cloudrunner/cloudtrace"
77
"google.golang.org/grpc/metadata"
88
)
99

1010
// WithIncomingTraceContext returns a new context with the specified trace.
11+
// Deprecated: use opentelemetry trace.ContextWithSpanContext instead.
1112
func WithIncomingTraceContext(ctx context.Context, traceContext cloudtrace.Context) context.Context {
1213
md, _ := metadata.FromIncomingContext(ctx)
1314
return metadata.NewIncomingContext(

cloudtrace/doc.go

+4
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,6 @@
11
// Package cloudtrace provides primitives for Cloud Trace integration.
2+
//
3+
// Deprecated: Google Cloud now officially supports the W3C standard for trace-context in their various products
4+
// and recommends using that instead. On top of that OpenTelemetry now also has official support for tracing so we
5+
// recommend using opentelemetry package for working with traces.
26
package cloudtrace

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ require (
4343
cloud.google.com/go v0.118.1 // indirect
4444
cloud.google.com/go/auth v0.15.0 // indirect
4545
cloud.google.com/go/auth/oauth2adapt v0.2.7 // indirect
46+
cloud.google.com/go/iam v1.3.1 // indirect
4647
cloud.google.com/go/longrunning v0.6.4 // indirect
4748
cloud.google.com/go/monitoring v1.23.0 // indirect
4849
cloud.google.com/go/trace v1.11.3 // indirect

go.sum

+4
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ cloud.google.com/go/compute/metadata v0.6.0 h1:A6hENjEsCDtC1k8byVsgwvVcioamEHvZ4
1111
cloud.google.com/go/compute/metadata v0.6.0/go.mod h1:FjyFAW1MW0C203CEOMDTu3Dk1FlqW3Rga40jzHL4hfg=
1212
cloud.google.com/go/iam v1.3.1 h1:KFf8SaT71yYq+sQtRISn90Gyhyf4X8RGgeAVC8XGf3E=
1313
cloud.google.com/go/iam v1.3.1/go.mod h1:3wMtuyT4NcbnYNPLMBzYRFiEfjKfJlLVLrisE7bwm34=
14+
cloud.google.com/go/kms v1.20.5 h1:aQQ8esAIVZ1atdJRxihhdxGQ64/zEbJoJnCz/ydSmKg=
15+
cloud.google.com/go/kms v1.20.5/go.mod h1:C5A8M1sv2YWYy1AE6iSrnddSG9lRGdJq5XEdBy28Lmw=
1416
cloud.google.com/go/logging v1.13.0 h1:7j0HgAp0B94o1YRDqiqm26w4q1rDMH7XNRU34lJXHYc=
1517
cloud.google.com/go/logging v1.13.0/go.mod h1:36CoKh6KA/M0PbhPKMq6/qety2DCAErbhXT62TuXALA=
1618
cloud.google.com/go/longrunning v0.6.4 h1:3tyw9rO3E2XVXzSApn1gyEEnH2K9SynNQjMlBi3uHLg=
@@ -163,6 +165,8 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
163165
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
164166
github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0=
165167
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
168+
go.einride.tech/aip v0.68.1 h1:16/AfSxcQISGN5z9C5lM+0mLYXihrHbQ1onvYTr93aQ=
169+
go.einride.tech/aip v0.68.1/go.mod h1:XaFtaj4HuA3Zwk9xoBtTWgNubZ0ZZXv9BZJCkuKuWbg=
166170
go.einride.tech/protobuf-sensitive v0.8.0 h1:EoeKBbjJFc+K1xvfut6Wat0AY1eMAdmBYmGiBdK8Plw=
167171
go.einride.tech/protobuf-sensitive v0.8.0/go.mod h1:YkjV9z/m+HxFxtvdEUKdjUhImB5QjTD+mzqAqP5pAnU=
168172
go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=

grpcserver.go

+9-3
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,23 @@ func NewGRPCServer(ctx context.Context, opts ...grpc.ServerOption) *grpc.Server
1818
if !ok {
1919
panic("cloudrunner.NewGRPCServer: must be called with a context from cloudrunner.Run")
2020
}
21+
unaryTracing := run.otelTraceMiddleware.GRPCServerUnaryInterceptor
22+
streamTracing := run.otelTraceMiddleware.GRPCStreamServerInterceptor
23+
if run.useLegacyTracing {
24+
unaryTracing = run.traceMiddleware.GRPCServerUnaryInterceptor
25+
streamTracing = run.traceMiddleware.GRPCStreamServerInterceptor
26+
}
2127
serverOptions := []grpc.ServerOption{
2228
grpc.StatsHandler(otelgrpc.NewServerHandler()),
2329
grpc.ChainUnaryInterceptor(
24-
run.loggerMiddleware.GRPCUnaryServerInterceptor, // adds context logger
25-
run.traceMiddleware.GRPCServerUnaryInterceptor, // needs the context logger
30+
run.loggerMiddleware.GRPCUnaryServerInterceptor, // adds context logger
31+
unaryTracing, // needs the context logger
2632
run.requestLoggerMiddleware.GRPCUnaryServerInterceptor, // needs to run after trace
2733
run.serverMiddleware.GRPCUnaryServerInterceptor, // needs to run after request logger
2834
),
2935
grpc.ChainStreamInterceptor(
3036
run.loggerMiddleware.GRPCStreamServerInterceptor,
31-
run.traceMiddleware.GRPCStreamServerInterceptor,
37+
streamTracing,
3238
run.requestLoggerMiddleware.GRPCStreamServerInterceptor,
3339
run.serverMiddleware.GRPCStreamServerInterceptor,
3440
),

httpserver.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,16 @@ func NewHTTPServer(ctx context.Context, handler http.Handler, middlewares ...HTT
2424
if !ok {
2525
panic("cloudrunner.NewHTTPServer: must be called with a context from cloudrunner.Run")
2626
}
27+
tracingMiddleware := run.otelTraceMiddleware.HTTPServer
28+
if run.useLegacyTracing {
29+
tracingMiddleware = run.traceMiddleware.HTTPServer
30+
}
2731
defaultMiddlewares := []cloudserver.HTTPMiddleware{
2832
func(handler http.Handler) http.Handler {
2933
return otelhttp.NewHandler(handler, "server")
3034
},
3135
run.loggerMiddleware.HTTPServer,
32-
run.traceMiddleware.HTTPServer,
36+
tracingMiddleware,
3337
run.requestLoggerMiddleware.HTTPServer,
3438
run.securityHeadersMiddleware.HTTPServer,
3539
run.serverMiddleware.HTTPServer,

options.go

+10
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55

66
"go.einride.tech/cloudrunner/cloudconfig"
7+
"go.einride.tech/cloudrunner/cloudotel"
78
"go.einride.tech/cloudrunner/cloudtrace"
89
"google.golang.org/grpc"
910
"google.golang.org/protobuf/proto"
@@ -42,8 +43,17 @@ func WithGRPCServerOptions(grpcServerOptions ...grpc.ServerOption) Option {
4243
}
4344

4445
// WithTraceHook configures the run context with a trace hook.
46+
// Deprecated: use WithOtelTraceHook instead.
4547
func WithTraceHook(traceHook func(context.Context, cloudtrace.Context) context.Context) Option {
4648
return func(run *runContext) {
49+
run.useLegacyTracing = true
4750
run.traceMiddleware.TraceHook = traceHook
4851
}
4952
}
53+
54+
// WithTraceHook configures the run context with a trace hook.
55+
func WithOtelTraceHook(traceHook cloudotel.TraceHook) Option {
56+
return func(run *runContext) {
57+
run.otelTraceMiddleware.TraceHook = traceHook
58+
}
59+
}

run.go

+5
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ func Run(fn func(context.Context) error, options ...Option) (err error) {
5656
flag.Parse()
5757
flag.CommandLine.SetOutput(os.Stdout)
5858
var run runContext
59+
run.otelTraceMiddleware = cloudotel.NewTraceMiddleware()
5960
for _, option := range options {
6061
option(&run)
6162
}
@@ -87,10 +88,12 @@ func Run(fn func(context.Context) error, options ...Option) (err error) {
8788
if *validate {
8889
return nil
8990
}
91+
9092
run.traceMiddleware.ProjectID = run.config.Runtime.ProjectID
9193
if run.traceMiddleware.TraceHook == nil {
9294
run.traceMiddleware.TraceHook = cloudtrace.IDHook
9395
}
96+
run.otelTraceMiddleware.ProjectID = run.config.Runtime.ProjectID
9497
run.serverMiddleware.Config = run.config.Server
9598
run.requestLoggerMiddleware.Config = run.config.RequestLogger
9699
ctx = withRunContext(ctx, &run)
@@ -179,7 +182,9 @@ type runContext struct {
179182
serverMiddleware cloudserver.Middleware
180183
clientMiddleware cloudclient.Middleware
181184
requestLoggerMiddleware cloudrequestlog.Middleware
185+
useLegacyTracing bool
182186
traceMiddleware cloudtrace.Middleware
187+
otelTraceMiddleware cloudotel.TraceMiddleware
183188
securityHeadersMiddleware cloudserver.SecurityHeadersMiddleware
184189
}
185190

trace.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,17 @@ package cloudrunner
33
import (
44
"context"
55

6-
"go.einride.tech/cloudrunner/cloudtrace"
6+
cloudtrace "go.einride.tech/cloudrunner/cloudtrace"
77
)
88

99
// IncomingTraceContext returns the Cloud Trace context from the incoming request metadata.
10-
// Deprecated: Use GetTraceContext instead.
10+
// Deprecated: Use opentelemetry trace.SpanContextFromContext instead.
1111
func IncomingTraceContext(ctx context.Context) (cloudtrace.Context, bool) {
1212
return cloudtrace.FromIncomingContext(ctx)
1313
}
1414

1515
// GetTraceContext returns the Cloud Trace context from the incoming request.
16+
// Deprecated: Use opentelemetry trace.SpanContextFromContext instead.
1617
func GetTraceContext(ctx context.Context) (cloudtrace.Context, bool) {
1718
return cloudtrace.GetContext(ctx)
1819
}

0 commit comments

Comments
 (0)