From a4efe57dcc382a2927a0b0d26d75809d2eae84a7 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab <43658574+mahadzaryab1@users.noreply.github.com> Date: Sun, 27 Oct 2024 00:44:48 -0400 Subject: [PATCH] [fix][grpc] Disable tracing in grpc storage writer clients (#6125) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Which problem is this PR solving? - Fixes #5971 - Towards #6113 and #5859 ## Description of the changes - This PR fixes an issue where the GRPC remote storage client was provided a tracer which was resulting in an infinite loop of trace generation. This infinite loop would happen when we would try to write a trace to storage which would generate a new trace that needed to be written and so on. This PR provides a fix for this by using a noop tracer for the writer clients so that we do not generate traces on the write paths but still do so when reading. - This is likely just a temporary fix and we'll want to monitor https://github.com/open-telemetry/opentelemetry-collector/issues/10663 for a better long-term fix. ## How was this change tested? - Added the healthcheck endpoint which was previously failing in #6113. ## Checklist - [x] I have read https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md - [x] I have signed all commits - [x] I have added unit tests for the new functionality - [x] I have run lint and test steps successfully - for `jaeger`: `make lint test` - for `jaeger-ui`: `yarn lint` and `yarn test` ## Co-Authors This PR is a continuation of https://github.com/jaegertracing/jaeger/pull/5979 Co-authored-by: cx <1249843194@qq.com> --------- Signed-off-by: Mahad Zaryab --- cmd/jaeger/internal/integration/grpc_test.go | 4 - plugin/storage/grpc/factory.go | 98 ++++++++++++------- plugin/storage/grpc/factory_test.go | 6 +- plugin/storage/grpc/shared/grpc_client.go | 16 +-- .../storage/grpc/shared/grpc_client_test.go | 2 +- 5 files changed, 77 insertions(+), 49 deletions(-) diff --git a/cmd/jaeger/internal/integration/grpc_test.go b/cmd/jaeger/internal/integration/grpc_test.go index 4f7c1ceefd2..cd12984bc62 100644 --- a/cmd/jaeger/internal/integration/grpc_test.go +++ b/cmd/jaeger/internal/integration/grpc_test.go @@ -4,11 +4,9 @@ package integration import ( - "fmt" "testing" "github.com/jaegertracing/jaeger/plugin/storage/integration" - "github.com/jaegertracing/jaeger/ports" ) type GRPCStorageIntegration struct { @@ -32,8 +30,6 @@ func TestGRPCStorage(t *testing.T) { s := &GRPCStorageIntegration{ E2EStorageIntegration: E2EStorageIntegration{ ConfigFile: "../../config-remote-storage.yaml", - // TODO this should be removed in favor of default health check endpoint - HealthCheckEndpoint: fmt.Sprintf("http://localhost:%d/", ports.QueryHTTP), }, } s.CleanUp = s.cleanUp diff --git a/plugin/storage/grpc/factory.go b/plugin/storage/grpc/factory.go index 12ec6e5ceb5..0f08667e194 100644 --- a/plugin/storage/grpc/factory.go +++ b/plugin/storage/grpc/factory.go @@ -20,6 +20,7 @@ import ( "go.opentelemetry.io/otel/metric" noopmetric "go.opentelemetry.io/otel/metric/noop" "go.opentelemetry.io/otel/trace" + "go.opentelemetry.io/otel/trace/noop" "go.uber.org/zap" "google.golang.org/grpc" @@ -42,13 +43,14 @@ var ( // interface comformance checks // Factory implements storage.Factory and creates storage components backed by a storage plugin. type Factory struct { - metricsFactory metrics.Factory - logger *zap.Logger - tracerProvider trace.TracerProvider - config Config - services *ClientPluginServices - remoteConn *grpc.ClientConn - host component.Host + metricsFactory metrics.Factory + logger *zap.Logger + tracerProvider trace.TracerProvider + config Config + services *ClientPluginServices + tracedRemoteConn *grpc.ClientConn + untracedRemoteConn *grpc.ClientConn + host component.Host } // NewFactory creates a new Factory. @@ -91,15 +93,9 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) f.metricsFactory, f.logger = metricsFactory, logger f.tracerProvider = otel.GetTracerProvider() - telset := component.TelemetrySettings{ - Logger: logger, - TracerProvider: f.tracerProvider, - // TODO needs to be joined with the metricsFactory - LeveledMeterProvider: func(_ configtelemetry.Level) metric.MeterProvider { - return noopmetric.NewMeterProvider() - }, - } - newClientFn := func(opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) { + tracedTelset := getTelset(logger, f.tracerProvider) + untracedTelset := getTelset(logger, noop.NewTracerProvider()) + newClientFn := func(telset component.TelemetrySettings, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) { clientOpts := make([]configgrpc.ToClientConnOption, 0) for _, opt := range opts { clientOpts = append(clientOpts, configgrpc.WithGrpcDialOption(opt)) @@ -108,7 +104,7 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) } var err error - f.services, err = f.newRemoteStorage(telset, newClientFn) + f.services, err = f.newRemoteStorage(tracedTelset, untracedTelset, newClientFn) if err != nil { return fmt.Errorf("grpc storage builder failed to create a store: %w", err) } @@ -116,32 +112,54 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) return nil } -type newClientFn func(opts ...grpc.DialOption) (*grpc.ClientConn, error) +type newClientFn func(telset component.TelemetrySettings, opts ...grpc.DialOption) (*grpc.ClientConn, error) -func (f *Factory) newRemoteStorage(telset component.TelemetrySettings, newClient newClientFn) (*ClientPluginServices, error) { +func (f *Factory) newRemoteStorage( + tracedTelset component.TelemetrySettings, + untracedTelset component.TelemetrySettings, + newClient newClientFn, +) (*ClientPluginServices, error) { c := f.config - opts := []grpc.DialOption{ - grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(telset.TracerProvider))), - } if c.Auth != nil { return nil, fmt.Errorf("authenticator is not supported") } - tenancyMgr := tenancy.NewManager(&c.Tenancy) + unaryInterceptors := []grpc.UnaryClientInterceptor{ + bearertoken.NewUnaryClientInterceptor(), + } + streamInterceptors := []grpc.StreamClientInterceptor{ + tenancy.NewClientStreamInterceptor(tenancyMgr), + } if tenancyMgr.Enabled { - opts = append(opts, grpc.WithUnaryInterceptor(tenancy.NewClientUnaryInterceptor(tenancyMgr))) - opts = append(opts, grpc.WithStreamInterceptor(tenancy.NewClientStreamInterceptor(tenancyMgr))) + unaryInterceptors = append(unaryInterceptors, tenancy.NewClientUnaryInterceptor(tenancyMgr)) + streamInterceptors = append(streamInterceptors, tenancy.NewClientStreamInterceptor(tenancyMgr)) } - opts = append(opts, grpc.WithUnaryInterceptor(bearertoken.NewUnaryClientInterceptor())) - opts = append(opts, grpc.WithStreamInterceptor(bearertoken.NewStreamClientInterceptor())) + baseOpts := append( + []grpc.DialOption{}, + grpc.WithChainUnaryInterceptor(unaryInterceptors...), + grpc.WithChainStreamInterceptor(streamInterceptors...), + ) + opts := append([]grpc.DialOption{}, baseOpts...) + opts = append(opts, grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(tracedTelset.TracerProvider)))) - remoteConn, err := newClient(opts...) + tracedRemoteConn, err := newClient(tracedTelset, opts...) if err != nil { - return nil, fmt.Errorf("error creating remote storage client: %w", err) + return nil, fmt.Errorf("error creating traced remote storage client: %w", err) + } + f.tracedRemoteConn = tracedRemoteConn + untracedOpts := append([]grpc.DialOption{}, baseOpts...) + untracedOpts = append( + untracedOpts, + grpc.WithStatsHandler( + otelgrpc.NewClientHandler( + otelgrpc.WithTracerProvider(untracedTelset.TracerProvider)))) + untracedRemoteConn, err := newClient(tracedTelset, untracedOpts...) + if err != nil { + return nil, fmt.Errorf("error creating untraced remote storage client: %w", err) } - f.remoteConn = remoteConn - grpcClient := shared.NewGRPCClient(remoteConn) + f.untracedRemoteConn = untracedRemoteConn + grpcClient := shared.NewGRPCClient(tracedRemoteConn, untracedRemoteConn) return &ClientPluginServices{ PluginServices: shared.PluginServices{ Store: grpcClient, @@ -205,8 +223,22 @@ func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) { // Close closes the resources held by the factory func (f *Factory) Close() error { var errs []error - if f.remoteConn != nil { - errs = append(errs, f.remoteConn.Close()) + if f.tracedRemoteConn != nil { + errs = append(errs, f.tracedRemoteConn.Close()) + } + if f.untracedRemoteConn != nil { + errs = append(errs, f.untracedRemoteConn.Close()) } return errors.Join(errs...) } + +func getTelset(logger *zap.Logger, tracerProvider trace.TracerProvider) component.TelemetrySettings { + return component.TelemetrySettings{ + Logger: logger, + TracerProvider: tracerProvider, + // TODO needs to be joined with the metricsFactory + LeveledMeterProvider: func(_ configtelemetry.Level) metric.MeterProvider { + return noopmetric.NewMeterProvider() + }, + } +} diff --git a/plugin/storage/grpc/factory_test.go b/plugin/storage/grpc/factory_test.go index 4b0541d5d97..71494da3cf4 100644 --- a/plugin/storage/grpc/factory_test.go +++ b/plugin/storage/grpc/factory_test.go @@ -125,12 +125,12 @@ func TestNewFactoryError(t *testing.T) { f, err := NewFactoryWithConfig(Config{}, metrics.NullFactory, zap.NewNop(), componenttest.NewNopHost()) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, f.Close()) }) - newClientFn := func(_ ...grpc.DialOption) (conn *grpc.ClientConn, err error) { + newClientFn := func(_ component.TelemetrySettings, _ ...grpc.DialOption) (conn *grpc.ClientConn, err error) { return nil, errors.New("test error") } - _, err = f.newRemoteStorage(component.TelemetrySettings{}, newClientFn) + _, err = f.newRemoteStorage(component.TelemetrySettings{}, component.TelemetrySettings{}, newClientFn) require.Error(t, err) - require.Contains(t, err.Error(), "error creating remote storage client") + require.Contains(t, err.Error(), "error creating traced remote storage client") }) } diff --git a/plugin/storage/grpc/shared/grpc_client.go b/plugin/storage/grpc/shared/grpc_client.go index ee0e910003b..99f42bde006 100644 --- a/plugin/storage/grpc/shared/grpc_client.go +++ b/plugin/storage/grpc/shared/grpc_client.go @@ -46,15 +46,15 @@ type GRPCClient struct { streamWriterClient storage_v1.StreamingSpanWriterPluginClient } -func NewGRPCClient(c *grpc.ClientConn) *GRPCClient { +func NewGRPCClient(tracedConn *grpc.ClientConn, untracedConn *grpc.ClientConn) *GRPCClient { return &GRPCClient{ - readerClient: storage_v1.NewSpanReaderPluginClient(c), - writerClient: storage_v1.NewSpanWriterPluginClient(c), - archiveReaderClient: storage_v1.NewArchiveSpanReaderPluginClient(c), - archiveWriterClient: storage_v1.NewArchiveSpanWriterPluginClient(c), - capabilitiesClient: storage_v1.NewPluginCapabilitiesClient(c), - depsReaderClient: storage_v1.NewDependenciesReaderPluginClient(c), - streamWriterClient: storage_v1.NewStreamingSpanWriterPluginClient(c), + readerClient: storage_v1.NewSpanReaderPluginClient(tracedConn), + writerClient: storage_v1.NewSpanWriterPluginClient(untracedConn), + archiveReaderClient: storage_v1.NewArchiveSpanReaderPluginClient(tracedConn), + archiveWriterClient: storage_v1.NewArchiveSpanWriterPluginClient(untracedConn), + capabilitiesClient: storage_v1.NewPluginCapabilitiesClient(tracedConn), + depsReaderClient: storage_v1.NewDependenciesReaderPluginClient(tracedConn), + streamWriterClient: storage_v1.NewStreamingSpanWriterPluginClient(untracedConn), } } diff --git a/plugin/storage/grpc/shared/grpc_client_test.go b/plugin/storage/grpc/shared/grpc_client_test.go index 51e56df6bb5..da8d82ce4a4 100644 --- a/plugin/storage/grpc/shared/grpc_client_test.go +++ b/plugin/storage/grpc/shared/grpc_client_test.go @@ -104,7 +104,7 @@ func withGRPCClient(fn func(r *grpcClientTest)) { func TestNewGRPCClient(t *testing.T) { conn := &grpc.ClientConn{} - client := NewGRPCClient(conn) + client := NewGRPCClient(conn, conn) assert.NotNil(t, client) assert.Implements(t, (*storage_v1.SpanReaderPluginClient)(nil), client.readerClient)