Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve builtin plugin sandbox #1380

Merged
merged 5 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 199 additions & 0 deletions pkg/plugin/acceptance_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func AcceptanceTestV1(t *testing.T, tdf testDispenserFunc) {
run(t, tdf, testSource_Lifecycle_OnCreated)
run(t, tdf, testSource_Lifecycle_OnUpdated)
run(t, tdf, testSource_Lifecycle_OnDeleted)
run(t, tdf, testSource_BlockingFunctions)

// destination tests
run(t, tdf, testDestination_Configure_Success)
Expand All @@ -78,6 +79,7 @@ func AcceptanceTestV1(t *testing.T, tdf testDispenserFunc) {
run(t, tdf, testDestination_Lifecycle_OnCreated)
run(t, tdf, testDestination_Lifecycle_OnUpdated)
run(t, tdf, testDestination_Lifecycle_OnDeleted)
run(t, tdf, testDestination_BlockingFunctions)
}

func run(t *testing.T, tdf testDispenserFunc, test func(*testing.T, testDispenserFunc)) {
Expand Down Expand Up @@ -636,6 +638,105 @@ func testSource_Lifecycle_OnDeleted(t *testing.T, tdf testDispenserFunc) {
is.NoErr(err)
}

func testSource_BlockingFunctions(t *testing.T, tdf testDispenserFunc) {
testCases := []struct {
name string
prepareExpectation func(m *mock.SourcePlugin, blockUntil chan struct{})
callFn func(context.Context, SourcePlugin) error
}{{
name: "Configure",
prepareExpectation: func(m *mock.SourcePlugin, blockUntil chan struct{}) {
m.EXPECT().
Configure(gomock.Any(), cpluginv1.SourceConfigureRequest{}).
Do(func(context.Context, cpluginv1.SourceConfigureRequest) {
<-blockUntil
})
},
callFn: func(ctx context.Context, d SourcePlugin) error {
return d.Configure(ctx, map[string]string{})
},
}, {
name: "Start",
prepareExpectation: func(m *mock.SourcePlugin, blockUntil chan struct{}) {
m.EXPECT().
Start(gomock.Any(), cpluginv1.SourceStartRequest{}).
Do(func(context.Context, cpluginv1.SourceStartRequest) {
<-blockUntil
})
},
callFn: func(ctx context.Context, d SourcePlugin) error {
return d.Start(ctx, nil)
},
}, {
name: "Stop",
prepareExpectation: func(m *mock.SourcePlugin, blockUntil chan struct{}) {
m.EXPECT().
Stop(gomock.Any(), cpluginv1.SourceStopRequest{}).
Do(func(context.Context, cpluginv1.SourceStopRequest) {
<-blockUntil
})
},
callFn: func(ctx context.Context, d SourcePlugin) error {
_, err := d.Stop(ctx)
return err
},
}, {
name: "Teardown",
prepareExpectation: func(m *mock.SourcePlugin, blockUntil chan struct{}) {
m.EXPECT().
Teardown(gomock.Any(), cpluginv1.SourceTeardownRequest{}).
Do(func(context.Context, cpluginv1.SourceTeardownRequest) {
<-blockUntil
})
},
callFn: func(ctx context.Context, d SourcePlugin) error {
return d.Teardown(ctx)
},
}}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
is := is.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

dispenser, _, mockSource, _ := tdf(t)

blockUntil := make(chan struct{})
tc.prepareExpectation(mockSource, blockUntil)

source, err := dispenser.DispenseSource()
is.NoErr(err)

fnErr := make(chan error)
go func() {
// call function in goroutine, because the mock will block
fnErr <- tc.callFn(ctx, source)
}()

// ensure that the call to the function is blocked
select {
case <-fnErr:
t.Fatal("plugin call should block")
case <-time.After(time.Second):
}

// cancelling the context should unblock the call, regardless if the
// mock is still blocking
cancel()
select {
case err = <-fnErr:
is.Equal(err, context.Canceled)
case <-time.After(time.Second):
t.Fatal("call to plugin should have stopped blocking")
}

// release the blocked call to the mock
close(blockUntil)
})
}
}

// -----------------
// -- DESTINATION --
// -----------------
Expand Down Expand Up @@ -1049,3 +1150,101 @@ func testDestination_Lifecycle_OnDeleted(t *testing.T, tdf testDispenserFunc) {
err = destination.LifecycleOnDeleted(ctx, want)
is.NoErr(err)
}

func testDestination_BlockingFunctions(t *testing.T, tdf testDispenserFunc) {
testCases := []struct {
name string
prepareExpectation func(m *mock.DestinationPlugin, blockUntil chan struct{})
callFn func(context.Context, DestinationPlugin) error
}{{
name: "Configure",
prepareExpectation: func(m *mock.DestinationPlugin, blockUntil chan struct{}) {
m.EXPECT().
Configure(gomock.Any(), cpluginv1.DestinationConfigureRequest{}).
Do(func(context.Context, cpluginv1.DestinationConfigureRequest) {
<-blockUntil
})
},
callFn: func(ctx context.Context, d DestinationPlugin) error {
return d.Configure(ctx, map[string]string{})
},
}, {
name: "Start",
prepareExpectation: func(m *mock.DestinationPlugin, blockUntil chan struct{}) {
m.EXPECT().
Start(gomock.Any(), cpluginv1.DestinationStartRequest{}).
Do(func(context.Context, cpluginv1.DestinationStartRequest) {
<-blockUntil
})
},
callFn: func(ctx context.Context, d DestinationPlugin) error {
return d.Start(ctx)
},
}, {
name: "Stop",
prepareExpectation: func(m *mock.DestinationPlugin, blockUntil chan struct{}) {
m.EXPECT().
Stop(gomock.Any(), cpluginv1.DestinationStopRequest{}).
Do(func(context.Context, cpluginv1.DestinationStopRequest) {
<-blockUntil
})
},
callFn: func(ctx context.Context, d DestinationPlugin) error {
return d.Stop(ctx, nil)
},
}, {
name: "Teardown",
prepareExpectation: func(m *mock.DestinationPlugin, blockUntil chan struct{}) {
m.EXPECT().
Teardown(gomock.Any(), cpluginv1.DestinationTeardownRequest{}).
Do(func(context.Context, cpluginv1.DestinationTeardownRequest) {
<-blockUntil
})
},
callFn: func(ctx context.Context, d DestinationPlugin) error {
return d.Teardown(ctx)
},
}}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
is := is.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

dispenser, _, _, mockDestination := tdf(t)

blockUntil := make(chan struct{})
tc.prepareExpectation(mockDestination, blockUntil)

destination, err := dispenser.DispenseDestination()
is.NoErr(err)

fnErr := make(chan error)
go func() {
// call function in goroutine, because the mock will block
fnErr <- tc.callFn(ctx, destination)
}()

// ensure that the call to the function is blocked
select {
case <-fnErr:
t.Fatal("plugin call should block")
case <-time.After(time.Second):
}

// cancelling the context should unblock the call, regardless if the
// mock is still blocking
cancel()
select {
case err = <-fnErr:
is.Equal(err, context.Canceled)
case <-time.After(time.Second):
t.Fatal("call to plugin should have stopped blocking")
}

// release the blocked call to the mock
close(blockUntil)
})
}
}
20 changes: 8 additions & 12 deletions pkg/plugin/builtin/v1/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (s *destinationPluginAdapter) withLogger(ctx context.Context) context.Conte

func (s *destinationPluginAdapter) Configure(ctx context.Context, cfg map[string]string) error {
s.logger.Trace(ctx).Msg("calling Configure")
_, err := runSandbox(s.impl.Configure, s.withLogger(ctx), toplugin.DestinationConfigureRequest(cfg))
_, err := runSandbox(s.impl.Configure, s.withLogger(ctx), toplugin.DestinationConfigureRequest(cfg), s.logger)
return err
}

Expand All @@ -69,15 +69,15 @@ func (s *destinationPluginAdapter) Start(ctx context.Context) error {

req := toplugin.DestinationStartRequest()
s.logger.Trace(ctx).Msg("calling Start")
_, err := runSandbox(s.impl.Start, s.withLogger(ctx), req)
_, err := runSandbox(s.impl.Start, s.withLogger(ctx), req, s.logger)
if err != nil {
return err
}

s.stream = newDestinationRunStream(ctx)
go func() {
s.logger.Trace(ctx).Msg("calling Run")
err := runSandboxNoResp(s.impl.Run, s.withLogger(ctx), cpluginv1.DestinationRunStream(s.stream))
err := runSandboxNoResp(s.impl.Run, s.withLogger(ctx), cpluginv1.DestinationRunStream(s.stream), s.logger)
if err != nil {
if !s.stream.stop(err) {
s.logger.Err(ctx, err).Msg("stream already stopped")
Expand Down Expand Up @@ -130,36 +130,32 @@ func (s *destinationPluginAdapter) Ack(ctx context.Context) (record.Position, er
}

func (s *destinationPluginAdapter) Stop(ctx context.Context, lastPosition record.Position) error {
if s.stream == nil {
return plugin.ErrStreamNotOpen
}

s.logger.Trace(ctx).Bytes(log.RecordPositionField, lastPosition).Msg("calling Stop")
_, err := runSandbox(s.impl.Stop, s.withLogger(ctx), toplugin.DestinationStopRequest(lastPosition))
_, err := runSandbox(s.impl.Stop, s.withLogger(ctx), toplugin.DestinationStopRequest(lastPosition), s.logger)
return err
}

func (s *destinationPluginAdapter) Teardown(ctx context.Context) error {
s.logger.Trace(ctx).Msg("calling Teardown")
_, err := runSandbox(s.impl.Teardown, s.withLogger(ctx), toplugin.DestinationTeardownRequest())
_, err := runSandbox(s.impl.Teardown, s.withLogger(ctx), toplugin.DestinationTeardownRequest(), s.logger)
return err
}

func (s *destinationPluginAdapter) LifecycleOnCreated(ctx context.Context, cfg map[string]string) error {
s.logger.Trace(ctx).Msg("calling LifecycleOnCreated")
_, err := runSandbox(s.impl.LifecycleOnCreated, s.withLogger(ctx), toplugin.DestinationLifecycleOnCreatedRequest(cfg))
_, err := runSandbox(s.impl.LifecycleOnCreated, s.withLogger(ctx), toplugin.DestinationLifecycleOnCreatedRequest(cfg), s.logger)
return err
}

func (s *destinationPluginAdapter) LifecycleOnUpdated(ctx context.Context, cfgBefore, cfgAfter map[string]string) error {
s.logger.Trace(ctx).Msg("calling LifecycleOnUpdated")
_, err := runSandbox(s.impl.LifecycleOnUpdated, s.withLogger(ctx), toplugin.DestinationLifecycleOnUpdatedRequest(cfgBefore, cfgAfter))
_, err := runSandbox(s.impl.LifecycleOnUpdated, s.withLogger(ctx), toplugin.DestinationLifecycleOnUpdatedRequest(cfgBefore, cfgAfter), s.logger)
return err
}

func (s *destinationPluginAdapter) LifecycleOnDeleted(ctx context.Context, cfg map[string]string) error {
s.logger.Trace(ctx).Msg("calling LifecycleOnDeleted")
_, err := runSandbox(s.impl.LifecycleOnDeleted, s.withLogger(ctx), toplugin.DestinationLifecycleOnDeletedRequest(cfg))
_, err := runSandbox(s.impl.LifecycleOnDeleted, s.withLogger(ctx), toplugin.DestinationLifecycleOnDeletedRequest(cfg), s.logger)
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/plugin/builtin/v1/dispenser.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func NewDispenser(
}

func (d *Dispenser) DispenseSpecifier() (plugin.SpecifierPlugin, error) {
return newSpecifierPluginAdapter(d.specifierPlugin()), nil
return newSpecifierPluginAdapter(d.specifierPlugin(), d.pluginLogger("specifier")), nil
}

func (d *Dispenser) DispenseSource() (plugin.SourcePlugin, error) {
Expand Down
6 changes: 4 additions & 2 deletions pkg/plugin/builtin/v1/internal/fromplugin/specifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,11 @@ func SpecifierParameter(in cpluginv1.SpecifierParameter) plugin.Parameter {
requiredExists = true
}
}
// needed for backward compatibility, in.Required is converted to a validation of type ValidationTypeRequired
// making sure not to duplicate the required validation
//nolint:staticcheck // needed for backward compatibility, in.Required is
// converted to a validation of type ValidationTypeRequired making sure not
// to duplicate the required validation
if in.Required && !requiredExists {
//nolint:makezero // false positive, we actually want to append here
validations = append(validations, plugin.Validation{
Type: plugin.ValidationTypeRequired,
})
Expand Down
Loading
Loading