diff --git a/pkg/plugin/acceptance_testing.go b/pkg/plugin/acceptance_testing.go index 5d6d396d6..ac263b463 100644 --- a/pkg/plugin/acceptance_testing.go +++ b/pkg/plugin/acceptance_testing.go @@ -62,6 +62,7 @@ func AcceptanceTestV1(t *testing.T, tdf testDispenserFunc) { run(t, tdf, testSource_Lifecycle_OnCreated) run(t, tdf, testSource_Lifecycle_OnUpdated) run(t, tdf, testSource_Lifecycle_OnDeleted) + run(t, tdf, testSource_BlockingFunctions) // destination tests run(t, tdf, testDestination_Configure_Success) @@ -78,6 +79,7 @@ func AcceptanceTestV1(t *testing.T, tdf testDispenserFunc) { run(t, tdf, testDestination_Lifecycle_OnCreated) run(t, tdf, testDestination_Lifecycle_OnUpdated) run(t, tdf, testDestination_Lifecycle_OnDeleted) + run(t, tdf, testDestination_BlockingFunctions) } func run(t *testing.T, tdf testDispenserFunc, test func(*testing.T, testDispenserFunc)) { @@ -636,6 +638,105 @@ func testSource_Lifecycle_OnDeleted(t *testing.T, tdf testDispenserFunc) { is.NoErr(err) } +func testSource_BlockingFunctions(t *testing.T, tdf testDispenserFunc) { + testCases := []struct { + name string + prepareExpectation func(m *mock.SourcePlugin, blockUntil chan struct{}) + callFn func(context.Context, SourcePlugin) error + }{{ + name: "Configure", + prepareExpectation: func(m *mock.SourcePlugin, blockUntil chan struct{}) { + m.EXPECT(). + Configure(gomock.Any(), cpluginv1.SourceConfigureRequest{}). + Do(func(context.Context, cpluginv1.SourceConfigureRequest) { + <-blockUntil + }) + }, + callFn: func(ctx context.Context, d SourcePlugin) error { + return d.Configure(ctx, map[string]string{}) + }, + }, { + name: "Start", + prepareExpectation: func(m *mock.SourcePlugin, blockUntil chan struct{}) { + m.EXPECT(). + Start(gomock.Any(), cpluginv1.SourceStartRequest{}). + Do(func(context.Context, cpluginv1.SourceStartRequest) { + <-blockUntil + }) + }, + callFn: func(ctx context.Context, d SourcePlugin) error { + return d.Start(ctx, nil) + }, + }, { + name: "Stop", + prepareExpectation: func(m *mock.SourcePlugin, blockUntil chan struct{}) { + m.EXPECT(). + Stop(gomock.Any(), cpluginv1.SourceStopRequest{}). + Do(func(context.Context, cpluginv1.SourceStopRequest) { + <-blockUntil + }) + }, + callFn: func(ctx context.Context, d SourcePlugin) error { + _, err := d.Stop(ctx) + return err + }, + }, { + name: "Teardown", + prepareExpectation: func(m *mock.SourcePlugin, blockUntil chan struct{}) { + m.EXPECT(). + Teardown(gomock.Any(), cpluginv1.SourceTeardownRequest{}). + Do(func(context.Context, cpluginv1.SourceTeardownRequest) { + <-blockUntil + }) + }, + callFn: func(ctx context.Context, d SourcePlugin) error { + return d.Teardown(ctx) + }, + }} + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + is := is.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + dispenser, _, mockSource, _ := tdf(t) + + blockUntil := make(chan struct{}) + tc.prepareExpectation(mockSource, blockUntil) + + source, err := dispenser.DispenseSource() + is.NoErr(err) + + fnErr := make(chan error) + go func() { + // call function in goroutine, because the mock will block + fnErr <- tc.callFn(ctx, source) + }() + + // ensure that the call to the function is blocked + select { + case <-fnErr: + t.Fatal("plugin call should block") + case <-time.After(time.Second): + } + + // cancelling the context should unblock the call, regardless if the + // mock is still blocking + cancel() + select { + case err = <-fnErr: + is.Equal(err, context.Canceled) + case <-time.After(time.Second): + t.Fatal("call to plugin should have stopped blocking") + } + + // release the blocked call to the mock + close(blockUntil) + }) + } +} + // ----------------- // -- DESTINATION -- // ----------------- @@ -1049,3 +1150,101 @@ func testDestination_Lifecycle_OnDeleted(t *testing.T, tdf testDispenserFunc) { err = destination.LifecycleOnDeleted(ctx, want) is.NoErr(err) } + +func testDestination_BlockingFunctions(t *testing.T, tdf testDispenserFunc) { + testCases := []struct { + name string + prepareExpectation func(m *mock.DestinationPlugin, blockUntil chan struct{}) + callFn func(context.Context, DestinationPlugin) error + }{{ + name: "Configure", + prepareExpectation: func(m *mock.DestinationPlugin, blockUntil chan struct{}) { + m.EXPECT(). + Configure(gomock.Any(), cpluginv1.DestinationConfigureRequest{}). + Do(func(context.Context, cpluginv1.DestinationConfigureRequest) { + <-blockUntil + }) + }, + callFn: func(ctx context.Context, d DestinationPlugin) error { + return d.Configure(ctx, map[string]string{}) + }, + }, { + name: "Start", + prepareExpectation: func(m *mock.DestinationPlugin, blockUntil chan struct{}) { + m.EXPECT(). + Start(gomock.Any(), cpluginv1.DestinationStartRequest{}). + Do(func(context.Context, cpluginv1.DestinationStartRequest) { + <-blockUntil + }) + }, + callFn: func(ctx context.Context, d DestinationPlugin) error { + return d.Start(ctx) + }, + }, { + name: "Stop", + prepareExpectation: func(m *mock.DestinationPlugin, blockUntil chan struct{}) { + m.EXPECT(). + Stop(gomock.Any(), cpluginv1.DestinationStopRequest{}). + Do(func(context.Context, cpluginv1.DestinationStopRequest) { + <-blockUntil + }) + }, + callFn: func(ctx context.Context, d DestinationPlugin) error { + return d.Stop(ctx, nil) + }, + }, { + name: "Teardown", + prepareExpectation: func(m *mock.DestinationPlugin, blockUntil chan struct{}) { + m.EXPECT(). + Teardown(gomock.Any(), cpluginv1.DestinationTeardownRequest{}). + Do(func(context.Context, cpluginv1.DestinationTeardownRequest) { + <-blockUntil + }) + }, + callFn: func(ctx context.Context, d DestinationPlugin) error { + return d.Teardown(ctx) + }, + }} + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + is := is.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + dispenser, _, _, mockDestination := tdf(t) + + blockUntil := make(chan struct{}) + tc.prepareExpectation(mockDestination, blockUntil) + + destination, err := dispenser.DispenseDestination() + is.NoErr(err) + + fnErr := make(chan error) + go func() { + // call function in goroutine, because the mock will block + fnErr <- tc.callFn(ctx, destination) + }() + + // ensure that the call to the function is blocked + select { + case <-fnErr: + t.Fatal("plugin call should block") + case <-time.After(time.Second): + } + + // cancelling the context should unblock the call, regardless if the + // mock is still blocking + cancel() + select { + case err = <-fnErr: + is.Equal(err, context.Canceled) + case <-time.After(time.Second): + t.Fatal("call to plugin should have stopped blocking") + } + + // release the blocked call to the mock + close(blockUntil) + }) + } +} diff --git a/pkg/plugin/builtin/v1/destination.go b/pkg/plugin/builtin/v1/destination.go index 27dfb140a..e15867796 100644 --- a/pkg/plugin/builtin/v1/destination.go +++ b/pkg/plugin/builtin/v1/destination.go @@ -58,7 +58,7 @@ func (s *destinationPluginAdapter) withLogger(ctx context.Context) context.Conte func (s *destinationPluginAdapter) Configure(ctx context.Context, cfg map[string]string) error { s.logger.Trace(ctx).Msg("calling Configure") - _, err := runSandbox(s.impl.Configure, s.withLogger(ctx), toplugin.DestinationConfigureRequest(cfg)) + _, err := runSandbox(s.impl.Configure, s.withLogger(ctx), toplugin.DestinationConfigureRequest(cfg), s.logger) return err } @@ -69,7 +69,7 @@ func (s *destinationPluginAdapter) Start(ctx context.Context) error { req := toplugin.DestinationStartRequest() s.logger.Trace(ctx).Msg("calling Start") - _, err := runSandbox(s.impl.Start, s.withLogger(ctx), req) + _, err := runSandbox(s.impl.Start, s.withLogger(ctx), req, s.logger) if err != nil { return err } @@ -77,7 +77,7 @@ func (s *destinationPluginAdapter) Start(ctx context.Context) error { s.stream = newDestinationRunStream(ctx) go func() { s.logger.Trace(ctx).Msg("calling Run") - err := runSandboxNoResp(s.impl.Run, s.withLogger(ctx), cpluginv1.DestinationRunStream(s.stream)) + err := runSandboxNoResp(s.impl.Run, s.withLogger(ctx), cpluginv1.DestinationRunStream(s.stream), s.logger) if err != nil { if !s.stream.stop(err) { s.logger.Err(ctx, err).Msg("stream already stopped") @@ -130,36 +130,32 @@ func (s *destinationPluginAdapter) Ack(ctx context.Context) (record.Position, er } func (s *destinationPluginAdapter) Stop(ctx context.Context, lastPosition record.Position) error { - if s.stream == nil { - return plugin.ErrStreamNotOpen - } - s.logger.Trace(ctx).Bytes(log.RecordPositionField, lastPosition).Msg("calling Stop") - _, err := runSandbox(s.impl.Stop, s.withLogger(ctx), toplugin.DestinationStopRequest(lastPosition)) + _, err := runSandbox(s.impl.Stop, s.withLogger(ctx), toplugin.DestinationStopRequest(lastPosition), s.logger) return err } func (s *destinationPluginAdapter) Teardown(ctx context.Context) error { s.logger.Trace(ctx).Msg("calling Teardown") - _, err := runSandbox(s.impl.Teardown, s.withLogger(ctx), toplugin.DestinationTeardownRequest()) + _, err := runSandbox(s.impl.Teardown, s.withLogger(ctx), toplugin.DestinationTeardownRequest(), s.logger) return err } func (s *destinationPluginAdapter) LifecycleOnCreated(ctx context.Context, cfg map[string]string) error { s.logger.Trace(ctx).Msg("calling LifecycleOnCreated") - _, err := runSandbox(s.impl.LifecycleOnCreated, s.withLogger(ctx), toplugin.DestinationLifecycleOnCreatedRequest(cfg)) + _, err := runSandbox(s.impl.LifecycleOnCreated, s.withLogger(ctx), toplugin.DestinationLifecycleOnCreatedRequest(cfg), s.logger) return err } func (s *destinationPluginAdapter) LifecycleOnUpdated(ctx context.Context, cfgBefore, cfgAfter map[string]string) error { s.logger.Trace(ctx).Msg("calling LifecycleOnUpdated") - _, err := runSandbox(s.impl.LifecycleOnUpdated, s.withLogger(ctx), toplugin.DestinationLifecycleOnUpdatedRequest(cfgBefore, cfgAfter)) + _, err := runSandbox(s.impl.LifecycleOnUpdated, s.withLogger(ctx), toplugin.DestinationLifecycleOnUpdatedRequest(cfgBefore, cfgAfter), s.logger) return err } func (s *destinationPluginAdapter) LifecycleOnDeleted(ctx context.Context, cfg map[string]string) error { s.logger.Trace(ctx).Msg("calling LifecycleOnDeleted") - _, err := runSandbox(s.impl.LifecycleOnDeleted, s.withLogger(ctx), toplugin.DestinationLifecycleOnDeletedRequest(cfg)) + _, err := runSandbox(s.impl.LifecycleOnDeleted, s.withLogger(ctx), toplugin.DestinationLifecycleOnDeletedRequest(cfg), s.logger) return err } diff --git a/pkg/plugin/builtin/v1/dispenser.go b/pkg/plugin/builtin/v1/dispenser.go index 31e4b9e32..ebbed428e 100644 --- a/pkg/plugin/builtin/v1/dispenser.go +++ b/pkg/plugin/builtin/v1/dispenser.go @@ -45,7 +45,7 @@ func NewDispenser( } func (d *Dispenser) DispenseSpecifier() (plugin.SpecifierPlugin, error) { - return newSpecifierPluginAdapter(d.specifierPlugin()), nil + return newSpecifierPluginAdapter(d.specifierPlugin(), d.pluginLogger("specifier")), nil } func (d *Dispenser) DispenseSource() (plugin.SourcePlugin, error) { diff --git a/pkg/plugin/builtin/v1/internal/fromplugin/specifier.go b/pkg/plugin/builtin/v1/internal/fromplugin/specifier.go index cec742214..b5257a3d3 100644 --- a/pkg/plugin/builtin/v1/internal/fromplugin/specifier.go +++ b/pkg/plugin/builtin/v1/internal/fromplugin/specifier.go @@ -70,9 +70,11 @@ func SpecifierParameter(in cpluginv1.SpecifierParameter) plugin.Parameter { requiredExists = true } } - // needed for backward compatibility, in.Required is converted to a validation of type ValidationTypeRequired - // making sure not to duplicate the required validation + //nolint:staticcheck // needed for backward compatibility, in.Required is + // converted to a validation of type ValidationTypeRequired making sure not + // to duplicate the required validation if in.Required && !requiredExists { + //nolint:makezero // false positive, we actually want to append here validations = append(validations, plugin.Validation{ Type: plugin.ValidationTypeRequired, }) diff --git a/pkg/plugin/builtin/v1/sandbox.go b/pkg/plugin/builtin/v1/sandbox.go index ed85aa44d..25c2b9051 100644 --- a/pkg/plugin/builtin/v1/sandbox.go +++ b/pkg/plugin/builtin/v1/sandbox.go @@ -16,10 +16,18 @@ package builtinv1 import ( "context" + "sync" "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/conduitio/conduit/pkg/foundation/log" ) +var sandboxChanPool = sync.Pool{ + New: func() any { + return make(chan any) + }, +} + // runSandbox takes a function and runs it in a sandboxed mode that catches // panics and converts them into an error instead. // It is specifically designed to run functions that take a context and a @@ -28,27 +36,75 @@ func runSandbox[REQ any, RES any]( f func(context.Context, REQ) (RES, error), ctx context.Context, // context is the second parameter on purpose req REQ, -) (res RES, err error) { - defer func() { - if r := recover(); r != nil { - panicErr, ok := r.(error) - if !ok { - panicErr = cerrors.Errorf("panic: %v", r) - } - err = panicErr + logger log.CtxLogger, +) (RES, error) { + c := sandboxChanPool.Get().(chan any) + + returnResponse := func(ctx context.Context, res RES, err error) { + defer sandboxChanPool.Put(c) + select { + case <-ctx.Done(): + // The context was cancelled, nobody will fetch the result. + logger.Error(ctx). + Any("response", res). + Err(err). + Msg("context cancelled when trying to return response from builtin connector plugin (this message comes from a detached plugin)") + case c <- res: + // The result was sent, now send the error if any and return the + // channel to the pool. + c <- err } + } + + go func() { + defer func() { + if r := recover(); r != nil { + err, ok := r.(error) + if !ok { + err = cerrors.Errorf("panic: %v", r) + } + // return the panic error + var emptyRes RES + returnResponse(ctx, emptyRes, err) + } + }() + + res, err := f(ctx, req) + returnResponse(ctx, res, err) }() - return f(ctx, req) + + select { + case <-ctx.Done(): + // Context was cancelled, detach from calling goroutine and return. + logger.Error(ctx).Msg("context cancelled while waiting for builtin connector plugin to respond, detaching from plugin") + var emptyRes RES + return emptyRes, ctx.Err() + case v := <-c: + var res RES + var err error + + // We got a response, which means the goroutine will send another value + // (the error) and then return the channel to the pool. + if v != nil { + res = v.(RES) + } + v = <-c + if v != nil { + err = v.(error) + } + return res, err + } } func runSandboxNoResp[REQ any]( f func(context.Context, REQ) error, ctx context.Context, // context is the second parameter on purpose req REQ, + logger log.CtxLogger, ) error { _, err := runSandbox(func(ctx context.Context, req REQ) (any, error) { err := f(ctx, req) return nil, err - }, ctx, req) + }, ctx, req, logger) return err } diff --git a/pkg/plugin/builtin/v1/sandbox_test.go b/pkg/plugin/builtin/v1/sandbox_test.go index cda86d5df..aee9d4c79 100644 --- a/pkg/plugin/builtin/v1/sandbox_test.go +++ b/pkg/plugin/builtin/v1/sandbox_test.go @@ -19,13 +19,16 @@ import ( "testing" "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/conduitio/conduit/pkg/foundation/log" "github.com/matryer/is" + "github.com/rs/zerolog" ) func TestRunSandbox(t *testing.T) { ctx := context.Background() is := is.New(t) haveErr := cerrors.New("test error") + logger := log.New(zerolog.New(zerolog.NewTestWriter(t))) type testReq struct { foo string @@ -84,7 +87,7 @@ func TestRunSandbox(t *testing.T) { for _, td := range testData { t.Run(td.name, func(t *testing.T) { - gotResp, gotErr := runSandbox(td.f, ctx, td.req) + gotResp, gotErr := runSandbox(td.f, ctx, td.req, logger) is.Equal(gotResp, td.resp) if td.strict { // strict mode means we expect a very specific error @@ -100,7 +103,9 @@ func TestRunSandbox(t *testing.T) { func TestRunSandboxNoResp(t *testing.T) { ctx := context.Background() is := is.New(t) + logger := log.New(zerolog.New(zerolog.NewTestWriter(t))) + wantErr := cerrors.New("test error") - gotErr := runSandboxNoResp(func(context.Context, any) error { panic(wantErr) }, ctx, nil) + gotErr := runSandboxNoResp(func(context.Context, any) error { panic(wantErr) }, ctx, nil, logger) is.Equal(gotErr, wantErr) } diff --git a/pkg/plugin/builtin/v1/source.go b/pkg/plugin/builtin/v1/source.go index e184fecb8..71b15b6f3 100644 --- a/pkg/plugin/builtin/v1/source.go +++ b/pkg/plugin/builtin/v1/source.go @@ -59,7 +59,7 @@ func (s *sourcePluginAdapter) withLogger(ctx context.Context) context.Context { func (s *sourcePluginAdapter) Configure(ctx context.Context, cfg map[string]string) error { s.logger.Trace(ctx).Msg("calling Configure") - _, err := runSandbox(s.impl.Configure, s.withLogger(ctx), toplugin.SourceConfigureRequest(cfg)) + _, err := runSandbox(s.impl.Configure, s.withLogger(ctx), toplugin.SourceConfigureRequest(cfg), s.logger) return err } @@ -71,7 +71,7 @@ func (s *sourcePluginAdapter) Start(ctx context.Context, p record.Position) erro req := toplugin.SourceStartRequest(p) s.logger.Trace(ctx).Msg("calling Start") - resp, err := runSandbox(s.impl.Start, s.withLogger(ctx), req) + resp, err := runSandbox(s.impl.Start, s.withLogger(ctx), req, s.logger) if err != nil { return err } @@ -80,7 +80,7 @@ func (s *sourcePluginAdapter) Start(ctx context.Context, p record.Position) erro s.stream = newSourceRunStream(ctx) go func() { s.logger.Trace(ctx).Msg("calling Run") - err := runSandboxNoResp(s.impl.Run, s.withLogger(ctx), cpluginv1.SourceRunStream(s.stream)) + err := runSandboxNoResp(s.impl.Run, s.withLogger(ctx), cpluginv1.SourceRunStream(s.stream), s.logger) if err != nil { if !s.stream.stop(err) { s.logger.Err(ctx, err).Msg("stream already stopped") @@ -130,12 +130,8 @@ func (s *sourcePluginAdapter) Ack(ctx context.Context, p record.Position) error } func (s *sourcePluginAdapter) Stop(ctx context.Context) (record.Position, error) { - if s.stream == nil { - return nil, plugin.ErrStreamNotOpen - } - s.logger.Trace(ctx).Msg("calling Stop") - resp, err := runSandbox(s.impl.Stop, s.withLogger(ctx), toplugin.SourceStopRequest()) + resp, err := runSandbox(s.impl.Stop, s.withLogger(ctx), toplugin.SourceStopRequest(), s.logger) if err != nil { return nil, err } @@ -144,25 +140,25 @@ func (s *sourcePluginAdapter) Stop(ctx context.Context) (record.Position, error) func (s *sourcePluginAdapter) Teardown(ctx context.Context) error { s.logger.Trace(ctx).Msg("calling Teardown") - _, err := runSandbox(s.impl.Teardown, s.withLogger(ctx), toplugin.SourceTeardownRequest()) + _, err := runSandbox(s.impl.Teardown, s.withLogger(ctx), toplugin.SourceTeardownRequest(), s.logger) return err } func (s *sourcePluginAdapter) LifecycleOnCreated(ctx context.Context, cfg map[string]string) error { s.logger.Trace(ctx).Msg("calling LifecycleOnCreated") - _, err := runSandbox(s.impl.LifecycleOnCreated, s.withLogger(ctx), toplugin.SourceLifecycleOnCreatedRequest(cfg)) + _, err := runSandbox(s.impl.LifecycleOnCreated, s.withLogger(ctx), toplugin.SourceLifecycleOnCreatedRequest(cfg), s.logger) return err } func (s *sourcePluginAdapter) LifecycleOnUpdated(ctx context.Context, cfgBefore, cfgAfter map[string]string) error { s.logger.Trace(ctx).Msg("calling LifecycleOnUpdated") - _, err := runSandbox(s.impl.LifecycleOnUpdated, s.withLogger(ctx), toplugin.SourceLifecycleOnUpdatedRequest(cfgBefore, cfgAfter)) + _, err := runSandbox(s.impl.LifecycleOnUpdated, s.withLogger(ctx), toplugin.SourceLifecycleOnUpdatedRequest(cfgBefore, cfgAfter), s.logger) return err } func (s *sourcePluginAdapter) LifecycleOnDeleted(ctx context.Context, cfg map[string]string) error { s.logger.Trace(ctx).Msg("calling LifecycleOnDeleted") - _, err := runSandbox(s.impl.LifecycleOnDeleted, s.withLogger(ctx), toplugin.SourceLifecycleOnDeletedRequest(cfg)) + _, err := runSandbox(s.impl.LifecycleOnDeleted, s.withLogger(ctx), toplugin.SourceLifecycleOnDeletedRequest(cfg), s.logger) return err } diff --git a/pkg/plugin/builtin/v1/specifier.go b/pkg/plugin/builtin/v1/specifier.go index e2bc2ffc6..d21fedcf5 100644 --- a/pkg/plugin/builtin/v1/specifier.go +++ b/pkg/plugin/builtin/v1/specifier.go @@ -18,27 +18,30 @@ import ( "context" "github.com/conduitio/conduit-connector-protocol/cpluginv1" + "github.com/conduitio/conduit/pkg/foundation/log" "github.com/conduitio/conduit/pkg/plugin" "github.com/conduitio/conduit/pkg/plugin/builtin/v1/internal/fromplugin" "github.com/conduitio/conduit/pkg/plugin/builtin/v1/internal/toplugin" ) -// TODO make sure a panic in a plugin doesn't crash Conduit type specifierPluginAdapter struct { impl cpluginv1.SpecifierPlugin + // logger is used as the internal logger of specifierPluginAdapter. + logger log.CtxLogger } var _ plugin.SpecifierPlugin = (*specifierPluginAdapter)(nil) -func newSpecifierPluginAdapter(impl cpluginv1.SpecifierPlugin) *specifierPluginAdapter { +func newSpecifierPluginAdapter(impl cpluginv1.SpecifierPlugin, logger log.CtxLogger) *specifierPluginAdapter { return &specifierPluginAdapter{ - impl: impl, + impl: impl, + logger: logger.WithComponent("builtinv1.specifierPluginAdapter"), } } func (s *specifierPluginAdapter) Specify() (plugin.Specification, error) { req := toplugin.SpecifierSpecifyRequest() - resp, err := runSandbox(s.impl.Specify, context.Background(), req) + resp, err := runSandbox(s.impl.Specify, context.Background(), req, s.logger) if err != nil { return plugin.Specification{}, err } diff --git a/pkg/plugin/service.go b/pkg/plugin/service.go index 385010127..1d74556b6 100644 --- a/pkg/plugin/service.go +++ b/pkg/plugin/service.go @@ -56,8 +56,6 @@ func (s *Service) Check(_ context.Context) error { } func (s *Service) NewDispenser(logger log.CtxLogger, name string) (Dispenser, error) { - logger = logger.WithComponent("plugin") - fullName := FullName(name) switch fullName.PluginType() { case PluginTypeStandalone: diff --git a/pkg/plugin/standalone/registry.go b/pkg/plugin/standalone/registry.go index 6d3cd975d..2a8be1c39 100644 --- a/pkg/plugin/standalone/registry.go +++ b/pkg/plugin/standalone/registry.go @@ -191,6 +191,7 @@ func (r *Registry) NewDispenser(logger log.CtxLogger, fullName plugin.FullName) return nil, cerrors.Errorf("could not find standalone plugin, only found versions %v: %w", availableVersions, plugin.ErrPluginNotFound) } + logger = logger.WithComponent("plugin.standalone") return standalonev1.NewDispenser(logger.ZerologWithComponent(), bp.path) } diff --git a/pkg/plugin/standalone/v1/destination.go b/pkg/plugin/standalone/v1/destination.go index b8d44b6d5..091b19d16 100644 --- a/pkg/plugin/standalone/v1/destination.go +++ b/pkg/plugin/standalone/v1/destination.go @@ -118,10 +118,6 @@ func (s *destinationPluginClient) Ack(_ context.Context) (record.Position, error } func (s *destinationPluginClient) Stop(ctx context.Context, lastPosition record.Position) error { - if s.stream == nil { - return plugin.ErrStreamNotOpen - } - protoReq := toproto.DestinationStopRequest(lastPosition) _, err := s.grpcClient.Stop(ctx, protoReq) if err != nil { diff --git a/pkg/plugin/standalone/v1/source.go b/pkg/plugin/standalone/v1/source.go index a69612856..0a6f2d92a 100644 --- a/pkg/plugin/standalone/v1/source.go +++ b/pkg/plugin/standalone/v1/source.go @@ -112,10 +112,6 @@ func (s *sourcePluginClient) Ack(_ context.Context, p record.Position) error { } func (s *sourcePluginClient) Stop(ctx context.Context) (record.Position, error) { - if s.stream == nil { - return nil, plugin.ErrStreamNotOpen - } - protoReq := toproto.SourceStopRequest() protoResp, err := s.grpcClient.Stop(ctx, protoReq) if err != nil {