Skip to content

Commit

Permalink
Fix goroutine leaks in grpc integration tests
Browse files Browse the repository at this point in the history
The key issue was that the grpc client was only being killed in
a `runtime.SetFinalizer` - i.e. when it is GCed. I think in the
tests this was not shutting down before goleak had decided that
the goroutine had leaked.

This change switches to a more conventional approach of killing
the client as part of the Close method and then ensuring this
is called in each of the tests.

While this change does not include a call to active goleak, it
was tested with this. It does not include goleak detection
because there are still other violations related to elasticsearch
in this package.

Signed-off-by: Will Sewell <[email protected]>
  • Loading branch information
Will Sewell committed Apr 8, 2024
1 parent ea53d8e commit e1db93f
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 11 deletions.
18 changes: 11 additions & 7 deletions plugin/storage/grpc/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"fmt"
"os/exec"
"runtime"
"time"

"github.com/hashicorp/go-hclog"
Expand Down Expand Up @@ -56,7 +55,15 @@ type Configuration struct {
// ClientPluginServices defines services plugin can expose and its capabilities
type ClientPluginServices struct {
shared.PluginServices
Capabilities shared.PluginCapabilities
Capabilities shared.PluginCapabilities
killPluginClient func()
}

func (c *ClientPluginServices) Close() error {
if c.killPluginClient != nil {
c.killPluginClient()
}
return nil
}

// PluginBuilder is used to create storage plugins. Implemented by Configuration.
Expand Down Expand Up @@ -153,10 +160,6 @@ func (c *Configuration) buildPlugin(logger *zap.Logger, tracerProvider trace.Tra
GRPCDialOptions: opts,
})

runtime.SetFinalizer(client, func(c *plugin.Client) {
c.Kill()
})

rpcClient, err := client.Client()
if err != nil {
return nil, fmt.Errorf("error attempting to connect to plugin rpc client: %w", err)
Expand Down Expand Up @@ -199,7 +202,8 @@ func (c *Configuration) buildPlugin(logger *zap.Logger, tracerProvider trace.Tra
ArchiveStore: archiveStoragePlugin,
StreamingSpanWriter: streamingSpanWriterPlugin,
},
Capabilities: capabilities,
Capabilities: capabilities,
killPluginClient: client.Kill,
}, nil
}

Expand Down
9 changes: 8 additions & 1 deletion plugin/storage/grpc/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package grpc

import (
"errors"
"flag"
"fmt"
"io"
Expand Down Expand Up @@ -53,6 +54,8 @@ type Factory struct {
archiveStore shared.ArchiveStoragePlugin
streamingSpanWriter shared.StreamingSpanWriterPlugin
capabilities shared.PluginCapabilities

servicesCloser io.Closer
}

// NewFactory creates a new Factory.
Expand Down Expand Up @@ -108,6 +111,7 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
f.archiveStore = services.ArchiveStore
f.capabilities = services.Capabilities
f.streamingSpanWriter = services.StreamingSpanWriter
f.servicesCloser = services
logger.Info("External plugin storage configuration", zap.Any("configuration", f.options.Configuration))
return nil
}
Expand Down Expand Up @@ -164,5 +168,8 @@ func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {

// Close closes the resources held by the factory
func (f *Factory) Close() error {
return f.builder.Close()
errs := []error{}
errs = append(errs, f.servicesCloser.Close())
errs = append(errs, f.builder.Close())
return errors.Join(errs...)
}
22 changes: 19 additions & 3 deletions plugin/storage/integration/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ func newgRPCServer() (*gRPCServer, error) {
func (s *gRPCServer) Restart() error {
// stop the server if one already exists
if s.server != nil {
s.server.GracefulStop()
s.wg.Wait()
s.close()
select {
case err := <-s.errChan:
return err
Expand Down Expand Up @@ -91,6 +90,13 @@ func (s *gRPCServer) Restart() error {
return nil
}

func (s *gRPCServer) close() {
if s.server != nil {
s.server.GracefulStop()
s.wg.Wait()
}
}

type GRPCStorageIntegrationTestSuite struct {
StorageIntegration
logger *zap.Logger
Expand Down Expand Up @@ -131,9 +137,16 @@ func (s *GRPCStorageIntegrationTestSuite) initialize(t *testing.T) {
s.CleanUp = s.cleanUp
}

func (s *GRPCStorageIntegrationTestSuite) cleanUp(t *testing.T) {
func (s *GRPCStorageIntegrationTestSuite) close(t *testing.T) {
if s.server != nil {
s.server.close()
}
err := s.factory.Close()
require.NoError(t, err)
}

func (s *GRPCStorageIntegrationTestSuite) cleanUp(t *testing.T) {
s.close(t)
s.initialize(t)
}

Expand Down Expand Up @@ -164,6 +177,7 @@ func TestGRPCStorage(t *testing.T) {
}
s.initialize(t)
s.RunAll(t)
s.close(t)
}

func TestGRPCStreamingWriter(t *testing.T) {
Expand All @@ -180,6 +194,7 @@ func TestGRPCStreamingWriter(t *testing.T) {
}
s.initialize(t)
s.RunAll(t)
s.close(t)
}

func TestGRPCRemoteStorage(t *testing.T) {
Expand All @@ -197,4 +212,5 @@ func TestGRPCRemoteStorage(t *testing.T) {
}
s.initialize(t)
s.RunAll(t)
s.close(t)
}

0 comments on commit e1db93f

Please sign in to comment.