Skip to content

Commit

Permalink
Prevent infinite loop in gRPC tracing during span storage.
Browse files Browse the repository at this point in the history
  • Loading branch information
ldlb9527 committed Sep 12, 2024
1 parent 7787010 commit ebf423a
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 23 deletions.
33 changes: 19 additions & 14 deletions plugin/storage/grpc/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
noopmetric "go.opentelemetry.io/otel/metric/noop"
Expand Down Expand Up @@ -49,8 +48,8 @@ type Factory struct {
configV1 Configuration
configV2 *ConfigV2

services *ClientPluginServices
remoteConn *grpc.ClientConn
services *ClientPluginServices
remoteConns []*grpc.ClientConn
}

// NewFactory creates a new Factory.
Expand Down Expand Up @@ -101,8 +100,8 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
return noopmetric.NewMeterProvider()
},
}
newClientFn := func(opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) {
return f.configV2.ToClientConn(context.Background(), componenttest.NewNopHost(), telset, opts...)
newClientFn := func(telSettings component.TelemetrySettings, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) {
return f.configV2.ToClientConn(context.Background(), componenttest.NewNopHost(), telSettings, opts...)
}

var err error
Expand All @@ -114,13 +113,11 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
return nil
}

type newClientFn func(opts ...grpc.DialOption) (*grpc.ClientConn, error)
type newClientFn func(telSettings component.TelemetrySettings, opts ...grpc.DialOption) (*grpc.ClientConn, error)

func (f *Factory) newRemoteStorage(telset component.TelemetrySettings, newClient newClientFn) (*ClientPluginServices, error) {
c := f.configV2
opts := []grpc.DialOption{
grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(telset.TracerProvider))),
}
opts := make([]grpc.DialOption, 0)
if c.Auth != nil {
return nil, fmt.Errorf("authenticator is not supported")
}
Expand All @@ -131,12 +128,20 @@ func (f *Factory) newRemoteStorage(telset component.TelemetrySettings, newClient
opts = append(opts, grpc.WithStreamInterceptor(tenancy.NewClientStreamInterceptor(tenancyMgr)))
}

remoteConn, err := newClient(opts...)
traceConn, err := newClient(telset, opts...)
if err != nil {
return nil, fmt.Errorf("error creating remote storage client: %w", err)
}
f.remoteConns = append(f.remoteConns, traceConn)

telset.TracerProvider = nil
noTraceConn, err := newClient(telset, opts...)
if err != nil {
return nil, fmt.Errorf("error creating remote storage client: %w", err)
}
f.remoteConn = remoteConn
grpcClient := shared.NewGRPCClient(remoteConn)
f.remoteConns = append(f.remoteConns, noTraceConn)

grpcClient := shared.NewGRPCClient(traceConn, noTraceConn)
return &ClientPluginServices{
PluginServices: shared.PluginServices{
Store: grpcClient,
Expand Down Expand Up @@ -200,8 +205,8 @@ func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
// Close closes the resources held by the factory
func (f *Factory) Close() error {
var errs []error
if f.remoteConn != nil {
errs = append(errs, f.remoteConn.Close())
for i := range f.remoteConns {
errs = append(errs, f.remoteConns[i].Close())
}
errs = append(errs, f.configV1.RemoteTLS.Close())
return errors.Join(errs...)
Expand Down
16 changes: 8 additions & 8 deletions plugin/storage/grpc/shared/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ type GRPCClient struct {
streamWriterClient storage_v1.StreamingSpanWriterPluginClient
}

func NewGRPCClient(c *grpc.ClientConn) *GRPCClient {
func NewGRPCClient(traceConn *grpc.ClientConn, noTraceConn *grpc.ClientConn) *GRPCClient {
return &GRPCClient{
readerClient: storage_v1.NewSpanReaderPluginClient(c),
writerClient: storage_v1.NewSpanWriterPluginClient(c),
archiveReaderClient: storage_v1.NewArchiveSpanReaderPluginClient(c),
archiveWriterClient: storage_v1.NewArchiveSpanWriterPluginClient(c),
capabilitiesClient: storage_v1.NewPluginCapabilitiesClient(c),
depsReaderClient: storage_v1.NewDependenciesReaderPluginClient(c),
streamWriterClient: storage_v1.NewStreamingSpanWriterPluginClient(c),
readerClient: storage_v1.NewSpanReaderPluginClient(traceConn),
writerClient: storage_v1.NewSpanWriterPluginClient(noTraceConn),
archiveReaderClient: storage_v1.NewArchiveSpanReaderPluginClient(traceConn),
archiveWriterClient: storage_v1.NewArchiveSpanWriterPluginClient(noTraceConn),
capabilitiesClient: storage_v1.NewPluginCapabilitiesClient(traceConn),
depsReaderClient: storage_v1.NewDependenciesReaderPluginClient(traceConn),
streamWriterClient: storage_v1.NewStreamingSpanWriterPluginClient(noTraceConn),
}
}

Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/grpc/shared/grpc_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func withGRPCClient(fn func(r *grpcClientTest)) {

func TestNewGRPCClient(t *testing.T) {
conn := &grpc.ClientConn{}
client := NewGRPCClient(conn)
client := NewGRPCClient(conn, conn)
assert.NotNil(t, client)

assert.Implements(t, (*storage_v1.SpanReaderPluginClient)(nil), client.readerClient)
Expand Down

0 comments on commit ebf423a

Please sign in to comment.