Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor pkg/plugin and isolate connector plugin #1302

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/connectors.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ Once you have chosen a connector to be built-in, you can:

- Download the new package and its dependencies: `go get "github.com/foo/conduit-connector-new"`
- Import the Go module defining the connector
into the [builtin registry](https://github.com/ConduitIO/conduit/blob/main/pkg/plugin/builtin/registry.go)
into the [builtin registry](https://github.com/ConduitIO/conduit/blob/main/pkg/plugin/connector/builtin/registry.go)
and add a new key to `DefaultDispenserFactories`:

```diff
Expand Down
2 changes: 1 addition & 1 deletion pkg/conduit/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/database"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/plugin/builtin"
"github.com/conduitio/conduit/pkg/plugin/connector/builtin"
"github.com/conduitio/conduit/pkg/processor"
"github.com/rs/zerolog"
)
Expand Down
4 changes: 2 additions & 2 deletions pkg/conduit/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ import (
"github.com/conduitio/conduit/pkg/orchestrator"
"github.com/conduitio/conduit/pkg/pipeline"
"github.com/conduitio/conduit/pkg/plugin"
"github.com/conduitio/conduit/pkg/plugin/builtin"
"github.com/conduitio/conduit/pkg/plugin/standalone"
"github.com/conduitio/conduit/pkg/plugin/connector/builtin"
"github.com/conduitio/conduit/pkg/plugin/connector/standalone"
"github.com/conduitio/conduit/pkg/processor"
"github.com/conduitio/conduit/pkg/provisioning"
"github.com/conduitio/conduit/pkg/web/api"
Expand Down
7 changes: 4 additions & 3 deletions pkg/connector/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ import (
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/plugin"
connectorPlugin "github.com/conduitio/conduit/pkg/plugin/connector"
"github.com/conduitio/conduit/pkg/record"
)

type Destination struct {
Instance *Instance

dispenser plugin.Dispenser
plugin plugin.DestinationPlugin
dispenser connectorPlugin.Dispenser
plugin connectorPlugin.DestinationPlugin

// errs is used to signal the node that the connector experienced an error
// when it was processing something asynchronously (e.g. persisting state).
Expand All @@ -37,7 +38,7 @@ type Destination struct {
// stopStream is a function that closes the context of the stream
stopStream context.CancelFunc

// wg tracks the number of in flight calls to the plugin.
// wg tracks the number of in flight calls to the connectorPlugin.
wg sync.WaitGroup
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/connector/destination_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/conduitio/conduit/pkg/foundation/database/inmemory"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/plugin"
"github.com/conduitio/conduit/pkg/plugin/mock"
"github.com/conduitio/conduit/pkg/plugin/connector/mock"
"github.com/matryer/is"
"go.uber.org/mock/gomock"
)
Expand Down
4 changes: 2 additions & 2 deletions pkg/connector/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/inspector"
"github.com/conduitio/conduit/pkg/plugin"
connectorPlugin "github.com/conduitio/conduit/pkg/plugin/connector"
)

const (
Expand Down Expand Up @@ -86,7 +86,7 @@ type Connector interface {

// PluginDispenserFetcher can fetch a plugin dispenser.
type PluginDispenserFetcher interface {
NewDispenser(logger log.CtxLogger, name string) (plugin.Dispenser, error)
NewDispenser(logger log.CtxLogger, name string) (connectorPlugin.Dispenser, error)
}

func (i *Instance) Init(logger log.CtxLogger, persister *Persister) {
Expand Down
5 changes: 3 additions & 2 deletions pkg/connector/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ package connector
import (
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/plugin"
connectorPlugin "github.com/conduitio/conduit/pkg/plugin/connector"
)

// fakePluginFetcher fulfills the PluginFetcher interface.
type fakePluginFetcher map[string]plugin.Dispenser
type fakePluginFetcher map[string]connectorPlugin.Dispenser

func (fpf fakePluginFetcher) NewDispenser(_ log.CtxLogger, name string) (plugin.Dispenser, error) {
func (fpf fakePluginFetcher) NewDispenser(_ log.CtxLogger, name string) (connectorPlugin.Dispenser, error) {
plug, ok := fpf[name]
if !ok {
return nil, plugin.ErrPluginNotFound
Expand Down
2 changes: 1 addition & 1 deletion pkg/connector/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/conduitio/conduit/pkg/foundation/database/inmemory"
"github.com/conduitio/conduit/pkg/foundation/database/mock"
"github.com/conduitio/conduit/pkg/foundation/log"
pmock "github.com/conduitio/conduit/pkg/plugin/mock"
pmock "github.com/conduitio/conduit/pkg/plugin/connector/mock"
"github.com/conduitio/conduit/pkg/record"
"github.com/google/uuid"
"github.com/matryer/is"
Expand Down
7 changes: 4 additions & 3 deletions pkg/connector/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ import (
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/plugin"
connectorPlugin "github.com/conduitio/conduit/pkg/plugin/connector"
"github.com/conduitio/conduit/pkg/record"
)

type Source struct {
Instance *Instance

dispenser plugin.Dispenser
plugin plugin.SourcePlugin
dispenser connectorPlugin.Dispenser
plugin connectorPlugin.SourcePlugin

// errs is used to signal the node that the connector experienced an error
// when it was processing something asynchronously (e.g. persisting state).
Expand All @@ -37,7 +38,7 @@ type Source struct {
// stopStream is a function that closes the context of the stream
stopStream context.CancelFunc

// wg tracks the number of in flight calls to the plugin.
// wg tracks the number of in flight calls to the connectorPlugin.
wg sync.WaitGroup
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/connector/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/conduitio/conduit/pkg/foundation/database/inmemory"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/plugin"
"github.com/conduitio/conduit/pkg/plugin/mock"
"github.com/conduitio/conduit/pkg/plugin/connector/mock"
"github.com/conduitio/conduit/pkg/record"
"github.com/matryer/is"
"go.uber.org/mock/gomock"
Expand Down
10 changes: 3 additions & 7 deletions pkg/orchestrator/connectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,16 +203,12 @@ func (c *ConnectorOrchestrator) Validate(
plugin string,
config connector.Config,
) error {
d, err := c.plugins.NewDispenser(c.logger, plugin)
if err != nil {
return cerrors.Errorf("couldn't get dispenser: %w", err)
}

var err error
switch t {
case connector.TypeSource:
err = c.plugins.ValidateSourceConfig(ctx, d, config.Settings)
err = c.plugins.ValidateSourceConfig(ctx, plugin, config.Settings)
case connector.TypeDestination:
err = c.plugins.ValidateDestinationConfig(ctx, d, config.Settings)
err = c.plugins.ValidateDestinationConfig(ctx, plugin, config.Settings)
default:
return cerrors.New("invalid connector type")
}
Expand Down
41 changes: 5 additions & 36 deletions pkg/orchestrator/connectors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/conduitio/conduit/pkg/foundation/database/inmemory"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/pipeline"
pmock "github.com/conduitio/conduit/pkg/plugin/mock"
"github.com/google/uuid"
"github.com/matryer/is"
"go.uber.org/mock/gomock"
Expand All @@ -34,7 +33,6 @@ func TestConnectorOrchestrator_Create_Success(t *testing.T) {
is := is.New(t)
ctx := context.Background()
db := &inmemory.DB{}
ctrl := gomock.NewController(t)
plsMock, consMock, procsMock, pluginMock := newMockServices(t)

pl := &pipeline.Instance{
Expand All @@ -57,18 +55,13 @@ func TestConnectorOrchestrator_Create_Success(t *testing.T) {
UpdatedAt: time.Now().UTC(),
}

pluginDispenser := pmock.NewDispenser(ctrl)

plsMock.EXPECT().
Get(gomock.AssignableToTypeOf(ctxType), pl.ID).
Return(pl, nil)
pluginMock.EXPECT().
NewDispenser(gomock.Any(), want.Plugin).
Return(pluginDispenser, nil)
pluginMock.EXPECT().
ValidateSourceConfig(
gomock.AssignableToTypeOf(ctxType),
pluginDispenser,
want.Plugin,
want.Config.Settings,
).Return(nil)
consMock.EXPECT().
Expand Down Expand Up @@ -159,11 +152,8 @@ func TestConnectorOrchestrator_Create_CreateConnectorError(t *testing.T) {
is := is.New(t)
ctx := context.Background()
db := &inmemory.DB{}
ctrl := gomock.NewController(t)
plsMock, consMock, procsMock, pluginMock := newMockServices(t)

pluginDispenser := pmock.NewDispenser(ctrl)

pl := &pipeline.Instance{
ID: uuid.NewString(),
Status: pipeline.StatusSystemStopped,
Expand All @@ -173,13 +163,10 @@ func TestConnectorOrchestrator_Create_CreateConnectorError(t *testing.T) {
plsMock.EXPECT().
Get(gomock.AssignableToTypeOf(ctxType), pl.ID).
Return(pl, nil)
pluginMock.EXPECT().
NewDispenser(gomock.Any(), "test-plugin").
Return(pluginDispenser, nil)
pluginMock.EXPECT().
ValidateSourceConfig(
gomock.AssignableToTypeOf(ctxType),
pluginDispenser,
"test-plugin",
config.Settings,
).Return(nil)
consMock.EXPECT().
Expand All @@ -205,11 +192,8 @@ func TestConnectorOrchestrator_Create_AddConnectorError(t *testing.T) {
is := is.New(t)
ctx := context.Background()
db := &inmemory.DB{}
ctrl := gomock.NewController(t)
plsMock, consMock, procsMock, pluginMock := newMockServices(t)

pluginDispenser := pmock.NewDispenser(ctrl)

pl := &pipeline.Instance{
ID: uuid.NewString(),
Status: pipeline.StatusSystemStopped,
Expand All @@ -234,13 +218,10 @@ func TestConnectorOrchestrator_Create_AddConnectorError(t *testing.T) {
plsMock.EXPECT().
Get(gomock.AssignableToTypeOf(ctxType), pl.ID).
Return(pl, nil)
pluginMock.EXPECT().
NewDispenser(gomock.Any(), conn.Plugin).
Return(pluginDispenser, nil)
pluginMock.EXPECT().
ValidateSourceConfig(
gomock.AssignableToTypeOf(ctxType),
pluginDispenser,
conn.Plugin,
conn.Config.Settings,
).Return(nil)
consMock.EXPECT().
Expand Down Expand Up @@ -458,11 +439,8 @@ func TestConnectorOrchestrator_Update_Success(t *testing.T) {
is := is.New(t)
ctx := context.Background()
db := &inmemory.DB{}
ctrl := gomock.NewController(t)
plsMock, consMock, procsMock, pluginMock := newMockServices(t)

pluginDispenser := pmock.NewDispenser(ctrl)

pl := &pipeline.Instance{
ID: uuid.NewString(),
Status: pipeline.StatusSystemStopped,
Expand Down Expand Up @@ -495,10 +473,7 @@ func TestConnectorOrchestrator_Update_Success(t *testing.T) {
Get(gomock.AssignableToTypeOf(ctxType), pl.ID).
Return(pl, nil)
pluginMock.EXPECT().
NewDispenser(gomock.Any(), conn.Plugin).
Return(pluginDispenser, nil)
pluginMock.EXPECT().
ValidateSourceConfig(gomock.Any(), pluginDispenser, newConfig.Settings).
ValidateSourceConfig(gomock.Any(), conn.Plugin, newConfig.Settings).
Return(nil)
consMock.EXPECT().
Update(gomock.AssignableToTypeOf(ctxType), conn.ID, newConfig).
Expand Down Expand Up @@ -562,11 +537,8 @@ func TestConnectorOrchestrator_Update_Fail(t *testing.T) {
is := is.New(t)
ctx := context.Background()
db := &inmemory.DB{}
ctrl := gomock.NewController(t)
plsMock, consMock, procsMock, pluginMock := newMockServices(t)

pluginDispenser := pmock.NewDispenser(ctrl)

pl := &pipeline.Instance{
ID: uuid.NewString(),
Status: pipeline.StatusSystemStopped,
Expand All @@ -585,10 +557,7 @@ func TestConnectorOrchestrator_Update_Fail(t *testing.T) {
Get(gomock.AssignableToTypeOf(ctxType), pl.ID).
Return(pl, nil)
pluginMock.EXPECT().
NewDispenser(gomock.Any(), conn.Plugin).
Return(pluginDispenser, nil)
pluginMock.EXPECT().
ValidateDestinationConfig(gomock.Any(), pluginDispenser, conn.Config.Settings).
ValidateDestinationConfig(gomock.Any(), conn.Plugin, conn.Config.Settings).
Return(nil)
consMock.EXPECT().
Update(gomock.AssignableToTypeOf(ctxType), conn.ID, connector.Config{}).
Expand Down
24 changes: 12 additions & 12 deletions pkg/orchestrator/mock/orchestrator.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions pkg/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/conduitio/conduit/pkg/foundation/database"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/pipeline"
"github.com/conduitio/conduit/pkg/plugin"
connectorPlugin "github.com/conduitio/conduit/pkg/plugin/connector"
"github.com/conduitio/conduit/pkg/processor"
)

Expand Down Expand Up @@ -113,8 +113,8 @@ type ProcessorService interface {
}

type PluginService interface {
List(ctx context.Context) (map[string]plugin.Specification, error)
NewDispenser(logger log.CtxLogger, name string) (plugin.Dispenser, error)
ValidateSourceConfig(ctx context.Context, d plugin.Dispenser, settings map[string]string) error
ValidateDestinationConfig(ctx context.Context, d plugin.Dispenser, settings map[string]string) error
ListConnectors(ctx context.Context) (map[string]connectorPlugin.Specification, error)
NewDispenser(logger log.CtxLogger, name string) (connectorPlugin.Dispenser, error)
ValidateSourceConfig(ctx context.Context, name string, settings map[string]string) error
ValidateDestinationConfig(ctx context.Context, name string, settings map[string]string) error
}
4 changes: 2 additions & 2 deletions pkg/orchestrator/orchestrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (
"github.com/conduitio/conduit/pkg/orchestrator/mock"
"github.com/conduitio/conduit/pkg/pipeline"
"github.com/conduitio/conduit/pkg/plugin"
"github.com/conduitio/conduit/pkg/plugin/builtin"
"github.com/conduitio/conduit/pkg/plugin/standalone"
"github.com/conduitio/conduit/pkg/plugin/connector/builtin"
"github.com/conduitio/conduit/pkg/plugin/connector/standalone"
"github.com/conduitio/conduit/pkg/processor"
"github.com/conduitio/conduit/pkg/processor/procbuiltin"
"github.com/conduitio/conduit/pkg/record"
Expand Down
Loading
Loading