diff --git a/examples/memstore-plugin/main.go b/examples/memstore-plugin/main.go index 76018ef9155..03ea1db8abf 100644 --- a/examples/memstore-plugin/main.go +++ b/examples/memstore-plugin/main.go @@ -19,7 +19,12 @@ import ( "path" "strings" + "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc" + "github.com/hashicorp/go-plugin" + "github.com/opentracing/opentracing-go" "github.com/spf13/viper" + jaegerClientConfig "github.com/uber/jaeger-client-go/config" + googleGRPC "google.golang.org/grpc" "github.com/jaegertracing/jaeger/plugin/storage/grpc" "github.com/jaegertracing/jaeger/plugin/storage/grpc/shared" @@ -28,6 +33,10 @@ import ( "github.com/jaegertracing/jaeger/storage/spanstore" ) +const ( + serviceName = "mem-store" +) + var configPath string func main() { @@ -46,13 +55,34 @@ func main() { opts := memory.Options{} opts.InitFromViper(v) - plugin := &memoryStorePlugin{ + traceCfg := &jaegerClientConfig.Configuration{ + ServiceName: serviceName, + Sampler: &jaegerClientConfig.SamplerConfig{ + Type: "const", + Param: 1.0, + }, + RPCMetrics: true, + } + + tracer, closer, err := traceCfg.NewTracer() + if err != nil { + panic("Failed to initialize tracer") + } + defer closer.Close() + opentracing.SetGlobalTracer(tracer) + + memStorePlugin := &memoryStorePlugin{ store: memory.NewStore(), archiveStore: memory.NewStore(), } - grpc.Serve(&shared.PluginServices{ - Store: plugin, - ArchiveStore: plugin, + grpc.ServeWithGRPCServer(&shared.PluginServices{ + Store: memStorePlugin, + ArchiveStore: memStorePlugin, + }, func(options []googleGRPC.ServerOption) *googleGRPC.Server { + return plugin.DefaultGRPCServer([]googleGRPC.ServerOption{ + googleGRPC.UnaryInterceptor(otgrpc.OpenTracingServerInterceptor(tracer)), + googleGRPC.StreamInterceptor(otgrpc.OpenTracingStreamServerInterceptor(tracer)), + }) }) } diff --git a/go.mod b/go.mod index d3579f4beaa..c6792a0df0b 100644 --- a/go.mod +++ b/go.mod @@ -30,8 +30,9 @@ require ( github.com/gorilla/handlers v1.5.1 github.com/gorilla/mux v1.8.0 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 + github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645 github.com/hashicorp/go-hclog v0.16.1 - github.com/hashicorp/go-plugin v1.4.1 + github.com/hashicorp/go-plugin v1.4.2 github.com/hashicorp/yamux v0.0.0-20190923154419-df201c70410d // indirect github.com/kr/pretty v0.2.1 github.com/mailru/easyjson v0.7.7 // indirect diff --git a/go.sum b/go.sum index 3870e8e1aa0..3452e673730 100644 --- a/go.sum +++ b/go.sum @@ -348,6 +348,7 @@ github.com/grpc-ecosystem/go-grpc-middleware v1.3.0/go.mod h1:z0ButlSOZa5vEBq9m2 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= +github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645 h1:MJG/KsmcqMwFAkh8mTnAwhyKoB+sTAnY4CACC110tbU= github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645/go.mod h1:6iZfnjpejD4L/4DwD7NryNaJyCQdzwWwH2MWhCA90Kw= github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8= github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4= @@ -363,8 +364,8 @@ github.com/hashicorp/go-hclog v0.16.1/go.mod h1:whpDNt7SSdeAju8AWKIWsul05p54N/39 github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk= -github.com/hashicorp/go-plugin v1.4.1 h1:6UltRQlLN9iZO513VveELp5xyaFxVD2+1OVylE+2E+w= -github.com/hashicorp/go-plugin v1.4.1/go.mod h1:5fGEH17QVwTTcR0zV7yhDPLLmFX9YSZ38b18Udy6vYQ= +github.com/hashicorp/go-plugin v1.4.2 h1:yFvG3ufXXpqiMiZx9HLcaK3XbIqQ1WJFR/F1a2CuVw0= +github.com/hashicorp/go-plugin v1.4.2/go.mod h1:5fGEH17QVwTTcR0zV7yhDPLLmFX9YSZ38b18Udy6vYQ= github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU= github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU= github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4= diff --git a/plugin/storage/grpc/README.md b/plugin/storage/grpc/README.md index 0db8fd50aaf..fd64d06c79a 100644 --- a/plugin/storage/grpc/README.md +++ b/plugin/storage/grpc/README.md @@ -150,6 +150,28 @@ There are more logger options that can be used with `hclog` listed on [godoc](ht Note: Setting the `Output` option to `os.Stdout` can confuse the `go-plugin` framework and lead it to consider the plugin errored. +Tracing +------- + +When `grpc-plugin` is used, it will be running as a separated process, thus context propagation is necessary for inter-process scenarios. + +In order to get complete traces containing both `jaeger-component`(s) and the `grpc-plugin`, developers should enable tracing at server-side. +Thus, we can leverage gRPC interceptors, + +```golang +grpc.ServeWithGRPCServer(&shared.PluginServices{ + Store: memStorePlugin, + ArchiveStore: memStorePlugin, +}, func(options []googleGRPC.ServerOption) *googleGRPC.Server { + return plugin.DefaultGRPCServer([]googleGRPC.ServerOption{ + googleGRPC.UnaryInterceptor(otgrpc.OpenTracingServerInterceptor(tracer)), + googleGRPC.StreamInterceptor(otgrpc.OpenTracingStreamServerInterceptor(tracer)), + }) +}) +``` + +Refer to `example/memstore-plugin` for more details. + Bearer token propagation from the UI ------------------------------------ When using `--query.bearer-token-propagation=true`, the bearer token will be properly passed on to the gRPC plugin server. To get access to the bearer token in your plugin, use a method similar to: diff --git a/plugin/storage/grpc/config/config.go b/plugin/storage/grpc/config/config.go index 6ee887d39c9..72a04a422ee 100644 --- a/plugin/storage/grpc/config/config.go +++ b/plugin/storage/grpc/config/config.go @@ -19,8 +19,11 @@ import ( "os/exec" "runtime" + "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-plugin" + "github.com/opentracing/opentracing-go" + "google.golang.org/grpc" "github.com/jaegertracing/jaeger/plugin/storage/grpc/shared" ) @@ -58,6 +61,10 @@ func (c *Configuration) Build() (*ClientPluginServices, error) { Logger: hclog.New(&hclog.LoggerOptions{ Level: hclog.LevelFromString(c.PluginLogLevel), }), + GRPCDialOptions: []grpc.DialOption{ + grpc.WithUnaryInterceptor(otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer())), + grpc.WithStreamInterceptor(otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer())), + }, }) runtime.SetFinalizer(client, func(c *plugin.Client) { diff --git a/plugin/storage/grpc/shared/archive.go b/plugin/storage/grpc/shared/archive.go index cd74f038c0f..41741e7b950 100644 --- a/plugin/storage/grpc/shared/archive.go +++ b/plugin/storage/grpc/shared/archive.go @@ -44,7 +44,7 @@ type archiveWriter struct { // GetTrace takes a traceID and returns a Trace associated with that traceID from Archive Storage func (r *archiveReader) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) { - stream, err := r.client.GetArchiveTrace(upgradeContextWithBearerToken(ctx), &storage_v1.GetTraceRequest{ + stream, err := r.client.GetArchiveTrace(upgradeContext(ctx), &storage_v1.GetTraceRequest{ TraceID: traceID, }) if status.Code(err) == codes.NotFound { diff --git a/plugin/storage/grpc/shared/grpc_client.go b/plugin/storage/grpc/shared/grpc_client.go index ec09629b7ef..f8342847044 100644 --- a/plugin/storage/grpc/shared/grpc_client.go +++ b/plugin/storage/grpc/shared/grpc_client.go @@ -34,6 +34,9 @@ var ( _ StoragePlugin = (*grpcClient)(nil) _ ArchiveStoragePlugin = (*grpcClient)(nil) _ PluginCapabilities = (*grpcClient)(nil) + + // upgradeContext composites several steps of upgrading context + upgradeContext = composeContextUpgradeFuncs(upgradeContextWithBearerToken) ) // grpcClient implements shared.StoragePlugin and reads/writes spans and dependencies @@ -46,16 +49,32 @@ type grpcClient struct { depsReaderClient storage_v1.DependenciesReaderPluginClient } +// ContextUpgradeFunc is a functional type that can be composed to upgrade context +type ContextUpgradeFunc func(ctx context.Context) context.Context + +// composeContextUpgradeFuncs composes ContextUpgradeFunc and returns a composed function +// to run the given func in strict order. +func composeContextUpgradeFuncs(funcs ...ContextUpgradeFunc) ContextUpgradeFunc { + return func(ctx context.Context) context.Context { + for _, fun := range funcs { + ctx = fun(ctx) + } + return ctx + } +} + // upgradeContextWithBearerToken turns the context into a gRPC outgoing context with bearer token // in the request metadata, if the original context has bearer token attached. // Otherwise returns original context. func upgradeContextWithBearerToken(ctx context.Context) context.Context { bearerToken, hasToken := spanstore.GetBearerToken(ctx) if hasToken { - requestMetadata := metadata.New(map[string]string{ - spanstore.BearerTokenKey: bearerToken, - }) - return metadata.NewOutgoingContext(ctx, requestMetadata) + md, ok := metadata.FromOutgoingContext(ctx) + if !ok { + md = metadata.New(nil) + } + md.Set(spanstore.BearerTokenKey, bearerToken) + return metadata.NewOutgoingContext(ctx, md) } return ctx } @@ -85,7 +104,7 @@ func (c *grpcClient) ArchiveSpanWriter() spanstore.Writer { // GetTrace takes a traceID and returns a Trace associated with that traceID func (c *grpcClient) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) { - stream, err := c.readerClient.GetTrace(upgradeContextWithBearerToken(ctx), &storage_v1.GetTraceRequest{ + stream, err := c.readerClient.GetTrace(upgradeContext(ctx), &storage_v1.GetTraceRequest{ TraceID: traceID, }) if status.Code(err) == codes.NotFound { @@ -100,7 +119,7 @@ func (c *grpcClient) GetTrace(ctx context.Context, traceID model.TraceID) (*mode // GetServices returns a list of all known services func (c *grpcClient) GetServices(ctx context.Context) ([]string, error) { - resp, err := c.readerClient.GetServices(upgradeContextWithBearerToken(ctx), &storage_v1.GetServicesRequest{}) + resp, err := c.readerClient.GetServices(upgradeContext(ctx), &storage_v1.GetServicesRequest{}) if err != nil { return nil, fmt.Errorf("plugin error: %w", err) } @@ -113,7 +132,7 @@ func (c *grpcClient) GetOperations( ctx context.Context, query spanstore.OperationQueryParameters, ) ([]spanstore.Operation, error) { - resp, err := c.readerClient.GetOperations(upgradeContextWithBearerToken(ctx), &storage_v1.GetOperationsRequest{ + resp, err := c.readerClient.GetOperations(upgradeContext(ctx), &storage_v1.GetOperationsRequest{ Service: query.ServiceName, SpanKind: query.SpanKind, }) @@ -141,7 +160,7 @@ func (c *grpcClient) GetOperations( // FindTraces retrieves traces that match the traceQuery func (c *grpcClient) FindTraces(ctx context.Context, query *spanstore.TraceQueryParameters) ([]*model.Trace, error) { - stream, err := c.readerClient.FindTraces(upgradeContextWithBearerToken(ctx), &storage_v1.FindTracesRequest{ + stream, err := c.readerClient.FindTraces(upgradeContext(ctx), &storage_v1.FindTracesRequest{ Query: &storage_v1.TraceQueryParameters{ ServiceName: query.ServiceName, OperationName: query.OperationName, @@ -179,7 +198,7 @@ func (c *grpcClient) FindTraces(ctx context.Context, query *spanstore.TraceQuery // FindTraceIDs retrieves traceIDs that match the traceQuery func (c *grpcClient) FindTraceIDs(ctx context.Context, query *spanstore.TraceQueryParameters) ([]model.TraceID, error) { - resp, err := c.readerClient.FindTraceIDs(upgradeContextWithBearerToken(ctx), &storage_v1.FindTraceIDsRequest{ + resp, err := c.readerClient.FindTraceIDs(upgradeContext(ctx), &storage_v1.FindTraceIDsRequest{ Query: &storage_v1.TraceQueryParameters{ ServiceName: query.ServiceName, OperationName: query.OperationName,