Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix goroutine leaks in grpc integration tests #5340

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions plugin/storage/grpc/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"fmt"
"os/exec"
"runtime"
"time"

"github.com/hashicorp/go-hclog"
Expand Down Expand Up @@ -56,7 +55,15 @@ type Configuration struct {
// ClientPluginServices defines services plugin can expose and its capabilities
type ClientPluginServices struct {
shared.PluginServices
Capabilities shared.PluginCapabilities
Capabilities shared.PluginCapabilities
killPluginClient func()
}

func (c *ClientPluginServices) Close() error {
if c.killPluginClient != nil {
c.killPluginClient()
}
return nil
}

// PluginBuilder is used to create storage plugins. Implemented by Configuration.
Expand Down Expand Up @@ -154,10 +161,6 @@ func (c *Configuration) buildPlugin(logger *zap.Logger, tracerProvider trace.Tra
GRPCDialOptions: opts,
})

runtime.SetFinalizer(client, func(c *plugin.Client) {
c.Kill()
})

rpcClient, err := client.Client()
if err != nil {
return nil, fmt.Errorf("error attempting to connect to plugin rpc client: %w", err)
Expand Down Expand Up @@ -200,7 +203,8 @@ func (c *Configuration) buildPlugin(logger *zap.Logger, tracerProvider trace.Tra
ArchiveStore: archiveStoragePlugin,
StreamingSpanWriter: streamingSpanWriterPlugin,
},
Capabilities: capabilities,
Capabilities: capabilities,
killPluginClient: client.Kill,
}, nil
}

Expand Down
11 changes: 10 additions & 1 deletion plugin/storage/grpc/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package grpc

import (
"errors"
"flag"
"fmt"
"io"
Expand Down Expand Up @@ -53,6 +54,8 @@ type Factory struct {
archiveStore shared.ArchiveStoragePlugin
streamingSpanWriter shared.StreamingSpanWriterPlugin
capabilities shared.PluginCapabilities

servicesCloser io.Closer
}

// NewFactory creates a new Factory.
Expand Down Expand Up @@ -108,6 +111,7 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
f.archiveStore = services.ArchiveStore
f.capabilities = services.Capabilities
f.streamingSpanWriter = services.StreamingSpanWriter
f.servicesCloser = services
logger.Info("External plugin storage configuration", zap.Any("configuration", f.options.Configuration))
return nil
}
Expand Down Expand Up @@ -164,5 +168,10 @@ func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {

// Close closes the resources held by the factory
func (f *Factory) Close() error {
return f.builder.Close()
errs := []error{}
if f.servicesCloser != nil {
errs = append(errs, f.servicesCloser.Close())
}
errs = append(errs, f.builder.Close())
return errors.Join(errs...)
}
9 changes: 8 additions & 1 deletion plugin/storage/integration/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,15 @@ func (s *GRPCStorageIntegrationTestSuite) initialize(t *testing.T) {
s.CleanUp = s.cleanUp
}

func (s *GRPCStorageIntegrationTestSuite) cleanUp(t *testing.T) {
func (s *GRPCStorageIntegrationTestSuite) close(t *testing.T) {
require.NoError(t, s.factory.Close())
if s.useRemoteStorage {
s.remoteStorage.Close(t)
}
}

func (s *GRPCStorageIntegrationTestSuite) cleanUp(t *testing.T) {
s.close(t)
s.initialize(t)
}

Expand Down Expand Up @@ -108,6 +112,7 @@ func TestGRPCStorage(t *testing.T) {
flags: flags,
}
s.initialize(t)
defer s.close(t)
s.RunAll(t)
}

Expand All @@ -124,6 +129,7 @@ func TestGRPCStreamingWriter(t *testing.T) {
flags: flags,
}
s.initialize(t)
defer s.close(t)
s.RunAll(t)
}

Expand All @@ -139,5 +145,6 @@ func TestGRPCRemoteStorage(t *testing.T) {
useRemoteStorage: true,
}
s.initialize(t)
defer s.close(t)
s.RunAll(t)
}
Loading