Skip to content

Commit

Permalink
BP: List processor plugins in API (#1406)
Browse files Browse the repository at this point in the history
* update proto API definitions

* fix proto linter warnings

* deprecate ListPlugins

* implement plugin list endpoint

* processor plugin orchestrator

* move plugin endpoints under connector and processor service
  • Loading branch information
lovromazgon authored Mar 6, 2024
1 parent 004faa0 commit 86ff56e
Show file tree
Hide file tree
Showing 18 changed files with 3,489 additions and 1,667 deletions.
4 changes: 2 additions & 2 deletions pkg/conduit/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,10 +431,10 @@ func (r *Runtime) serveGRPCAPI(ctx context.Context, t *tomb.Tomb) (net.Addr, err
pipelineAPIv1 := api.NewPipelineAPIv1(r.Orchestrator.Pipelines)
pipelineAPIv1.Register(grpcServer)

processorAPIv1 := api.NewProcessorAPIv1(r.Orchestrator.Processors)
processorAPIv1 := api.NewProcessorAPIv1(r.Orchestrator.Processors, r.Orchestrator.ProcessorPlugins)
processorAPIv1.Register(grpcServer)

connectorAPIv1 := api.NewConnectorAPIv1(r.Orchestrator.Connectors)
connectorAPIv1 := api.NewConnectorAPIv1(r.Orchestrator.Connectors, r.Orchestrator.ConnectorPlugins)
connectorAPIv1.Register(grpcServer)

pluginAPIv1 := api.NewPluginAPIv1(r.Orchestrator.ConnectorPlugins)
Expand Down
2 changes: 2 additions & 0 deletions pkg/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Orchestrator struct {
Pipelines *PipelineOrchestrator
Connectors *ConnectorOrchestrator
ConnectorPlugins *ConnectorPluginOrchestrator
ProcessorPlugins *ProcessorPluginOrchestrator
}

func NewOrchestrator(
Expand All @@ -59,6 +60,7 @@ func NewOrchestrator(
Pipelines: (*PipelineOrchestrator)(&b),
Connectors: (*ConnectorOrchestrator)(&b),
ConnectorPlugins: (*ConnectorPluginOrchestrator)(&b),
ProcessorPlugins: (*ProcessorPluginOrchestrator)(&b),
}
}

Expand Down
27 changes: 27 additions & 0 deletions pkg/orchestrator/processor_plugins.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright © 2022 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package orchestrator

import (
"context"

processorSdk "github.com/conduitio/conduit-processor-sdk"
)

type ProcessorPluginOrchestrator base

func (ps *ProcessorPluginOrchestrator) List(ctx context.Context) (map[string]processorSdk.Specification, error) {
return ps.processorPlugins.List(ctx)
}
66 changes: 56 additions & 10 deletions pkg/web/api/connector_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,15 @@

//go:generate mockgen -destination=mock/connector.go -package=mock -mock_names=ConnectorOrchestrator=ConnectorOrchestrator . ConnectorOrchestrator
//go:generate mockgen -destination=mock/connector_service.go -package=mock -mock_names=ConnectorService_InspectConnectorServer=ConnectorService_InspectConnectorServer github.com/conduitio/conduit/proto/api/v1 ConnectorService_InspectConnectorServer
//go:generate mockgen -destination=mock/connector_plugin.go -package=mock -mock_names=ConnectorPluginOrchestrator=ConnectorPluginOrchestrator . ConnectorPluginOrchestrator

package api

import (
"context"
"regexp"

connectorPlugin "github.com/conduitio/conduit/pkg/plugin/connector"

"github.com/conduitio/conduit/pkg/connector"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
Expand All @@ -39,13 +44,25 @@ type ConnectorOrchestrator interface {
Inspect(ctx context.Context, id string) (*inspector.Session, error)
}

type ConnectorPluginOrchestrator interface {
// List will return all connector plugins' specs.
List(ctx context.Context) (map[string]connectorPlugin.Specification, error)
}

type ConnectorAPIv1 struct {
apiv1.UnimplementedConnectorServiceServer
cs ConnectorOrchestrator
connectorOrchestrator ConnectorOrchestrator
connectorPluginOrchestrator ConnectorPluginOrchestrator
}

func NewConnectorAPIv1(cs ConnectorOrchestrator) *ConnectorAPIv1 {
return &ConnectorAPIv1{cs: cs}
func NewConnectorAPIv1(
co ConnectorOrchestrator,
cpo ConnectorPluginOrchestrator,
) *ConnectorAPIv1 {
return &ConnectorAPIv1{
connectorOrchestrator: co,
connectorPluginOrchestrator: cpo,
}
}

func (c *ConnectorAPIv1) Register(srv *grpc.Server) {
Expand All @@ -57,7 +74,7 @@ func (c *ConnectorAPIv1) ListConnectors(
req *apiv1.ListConnectorsRequest,
) (*apiv1.ListConnectorsResponse, error) {
// TODO: Implement filtering and limiting.
list := c.cs.List(ctx)
list := c.connectorOrchestrator.List(ctx)
var clist []*apiv1.Connector
for _, v := range list {
if req.PipelineId == "" || req.PipelineId == v.PipelineID {
Expand All @@ -73,7 +90,7 @@ func (c *ConnectorAPIv1) InspectConnector(req *apiv1.InspectConnectorRequest, se
return status.ConnectorError(cerrors.ErrEmptyID)
}

session, err := c.cs.Inspect(server.Context(), req.Id)
session, err := c.connectorOrchestrator.Inspect(server.Context(), req.Id)
if err != nil {
return status.ConnectorError(cerrors.Errorf("failed to inspect connector: %w", err))
}
Expand Down Expand Up @@ -105,7 +122,7 @@ func (c *ConnectorAPIv1) GetConnector(
}

// fetch the connector from the ConnectorOrchestrator
pr, err := c.cs.Get(ctx, req.Id)
pr, err := c.connectorOrchestrator.Get(ctx, req.Id)
if err != nil {
return nil, status.ConnectorError(cerrors.Errorf("failed to get connector by ID: %w", err))
}
Expand All @@ -121,7 +138,7 @@ func (c *ConnectorAPIv1) CreateConnector(
ctx context.Context,
req *apiv1.CreateConnectorRequest,
) (*apiv1.CreateConnectorResponse, error) {
created, err := c.cs.Create(
created, err := c.connectorOrchestrator.Create(
ctx,
fromproto.ConnectorType(req.Type),
req.Plugin,
Expand All @@ -145,7 +162,7 @@ func (c *ConnectorAPIv1) UpdateConnector(
return nil, cerrors.ErrEmptyID
}

updated, err := c.cs.Update(ctx, req.Id, fromproto.ConnectorConfig(req.Config))
updated, err := c.connectorOrchestrator.Update(ctx, req.Id, fromproto.ConnectorConfig(req.Config))

if err != nil {
return nil, status.ConnectorError(cerrors.Errorf("failed to update connector: %w", err))
Expand All @@ -157,7 +174,7 @@ func (c *ConnectorAPIv1) UpdateConnector(
}

func (c *ConnectorAPIv1) DeleteConnector(ctx context.Context, req *apiv1.DeleteConnectorRequest) (*apiv1.DeleteConnectorResponse, error) {
err := c.cs.Delete(ctx, req.Id)
err := c.connectorOrchestrator.Delete(ctx, req.Id)

if err != nil {
return nil, status.ConnectorError(cerrors.Errorf("failed to delete connector: %w", err))
Expand All @@ -172,7 +189,7 @@ func (c *ConnectorAPIv1) ValidateConnector(
ctx context.Context,
req *apiv1.ValidateConnectorRequest,
) (*apiv1.ValidateConnectorResponse, error) {
err := c.cs.Validate(
err := c.connectorOrchestrator.Validate(
ctx,
fromproto.ConnectorType(req.Type),
req.Plugin,
Expand All @@ -185,3 +202,32 @@ func (c *ConnectorAPIv1) ValidateConnector(

return &apiv1.ValidateConnectorResponse{}, nil
}

func (c *ConnectorAPIv1) ListConnectorPlugins(
ctx context.Context,
req *apiv1.ListConnectorPluginsRequest,
) (*apiv1.ListConnectorPluginsResponse, error) {
var nameFilter *regexp.Regexp
if req.GetName() != "" {
var err error
nameFilter, err = regexp.Compile("^" + req.GetName() + "$")
if err != nil {
return nil, status.PluginError(cerrors.New("invalid name regex"))
}
}

mp, err := c.connectorPluginOrchestrator.List(ctx)
if err != nil {
return nil, status.PluginError(err)
}
var plist []*apiv1.ConnectorPluginSpecifications

for name, v := range mp {
if nameFilter != nil && !nameFilter.MatchString(name) {
continue // don't add to result list, filter didn't match
}
plist = append(plist, toproto.ConnectorPluginSpecifications(name, v))
}

return &apiv1.ListConnectorPluginsResponse{Plugins: plist}, nil
}
87 changes: 76 additions & 11 deletions pkg/web/api/connector_v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"testing"
"time"

connectorPlugin "github.com/conduitio/conduit/pkg/plugin/connector"

"github.com/conduitio/conduit/pkg/connector"
"github.com/conduitio/conduit/pkg/foundation/cchan"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
Expand All @@ -41,7 +43,7 @@ func TestConnectorAPIv1_ListConnectors(t *testing.T) {
ctx := context.Background()
ctrl := gomock.NewController(t)
csMock := apimock.NewConnectorOrchestrator(ctrl)
api := NewConnectorAPIv1(csMock)
api := NewConnectorAPIv1(csMock, nil)

source := newTestSource()
destination := newTestDestination()
Expand Down Expand Up @@ -118,7 +120,7 @@ func TestConnectorAPIv1_ListConnectorsByPipeline(t *testing.T) {
ctx := context.Background()
ctrl := gomock.NewController(t)
csMock := apimock.NewConnectorOrchestrator(ctrl)
api := NewConnectorAPIv1(csMock)
api := NewConnectorAPIv1(csMock, nil)

source := newTestSource()
destination := newTestDestination()
Expand Down Expand Up @@ -171,7 +173,7 @@ func TestConnectorAPIv1_CreateConnector(t *testing.T) {
ctx := context.Background()
ctrl := gomock.NewController(t)
csMock := apimock.NewConnectorOrchestrator(ctrl)
api := NewConnectorAPIv1(csMock)
api := NewConnectorAPIv1(csMock, nil)

source := newTestSource()

Expand Down Expand Up @@ -221,7 +223,7 @@ func TestConnectorAPIv1_InspectConnector_SendRecord(t *testing.T) {
defer cancel()
ctrl := gomock.NewController(t)
csMock := apimock.NewConnectorOrchestrator(ctrl)
api := NewConnectorAPIv1(csMock)
api := NewConnectorAPIv1(csMock, nil)

id := uuid.NewString()
rec := generateTestRecord()
Expand Down Expand Up @@ -258,7 +260,7 @@ func TestConnectorAPIv1_InspectConnector_SendErr(t *testing.T) {
defer cancel()
ctrl := gomock.NewController(t)
csMock := apimock.NewConnectorOrchestrator(ctrl)
api := NewConnectorAPIv1(csMock)
api := NewConnectorAPIv1(csMock, nil)
id := uuid.NewString()

ins := inspector.New(log.Nop(), 10)
Expand Down Expand Up @@ -297,7 +299,7 @@ func TestConnectorAPIv1_InspectConnector_Err(t *testing.T) {
defer cancel()
ctrl := gomock.NewController(t)
csMock := apimock.NewConnectorOrchestrator(ctrl)
api := NewConnectorAPIv1(csMock)
api := NewConnectorAPIv1(csMock, nil)
id := uuid.NewString()
err := cerrors.New("not found, sorry")

Expand Down Expand Up @@ -338,7 +340,7 @@ func TestConnectorAPIv1_GetConnector(t *testing.T) {
ctx := context.Background()
ctrl := gomock.NewController(t)
csMock := apimock.NewConnectorOrchestrator(ctrl)
api := NewConnectorAPIv1(csMock)
api := NewConnectorAPIv1(csMock, nil)

source := newTestSource()

Expand Down Expand Up @@ -384,7 +386,7 @@ func TestConnectorAPIv1_UpdateConnector(t *testing.T) {
ctx := context.Background()
ctrl := gomock.NewController(t)
csMock := apimock.NewConnectorOrchestrator(ctrl)
api := NewConnectorAPIv1(csMock)
api := NewConnectorAPIv1(csMock, nil)

before := newTestSource()
after := newTestSource()
Expand Down Expand Up @@ -437,7 +439,7 @@ func TestConnectorAPIv1_DeleteConnector(t *testing.T) {
ctx := context.Background()
ctrl := gomock.NewController(t)
csMock := apimock.NewConnectorOrchestrator(ctrl)
api := NewConnectorAPIv1(csMock)
api := NewConnectorAPIv1(csMock, nil)

id := uuid.NewString()

Expand All @@ -462,7 +464,7 @@ func TestConnectorAPIv1_ValidateConnector(t *testing.T) {
ctx := context.Background()
ctrl := gomock.NewController(t)
csMock := apimock.NewConnectorOrchestrator(ctrl)
api := NewConnectorAPIv1(csMock)
api := NewConnectorAPIv1(csMock, nil)

config := connector.Config{
Name: "A source connector",
Expand Down Expand Up @@ -496,7 +498,7 @@ func TestConnectorAPIv1_ValidateConnectorError(t *testing.T) {
ctx := context.Background()
ctrl := gomock.NewController(t)
csMock := apimock.NewConnectorOrchestrator(ctrl)
api := NewConnectorAPIv1(csMock)
api := NewConnectorAPIv1(csMock, nil)

config := connector.Config{
Name: "A source connector",
Expand All @@ -522,6 +524,69 @@ func TestConnectorAPIv1_ValidateConnectorError(t *testing.T) {
is.True(err != nil)
}

func TestConnectorAPIv1_ListConnectorPluginsByName(t *testing.T) {
is := is.New(t)

ctx := context.Background()
ctrl := gomock.NewController(t)
cpoMock := apimock.NewConnectorPluginOrchestrator(ctrl)
api := NewConnectorAPIv1(nil, cpoMock)

names := []string{"do-not-want-this-plugin", "want-p1", "want-p2", "skip", "another-skipped"}

plsMap := make(map[string]connectorPlugin.Specification)
pls := make([]connectorPlugin.Specification, 0)

for _, name := range names {
ps := connectorPlugin.Specification{
Name: name,
Description: "desc",
Version: "v1.0",
Author: "Aaron",
SourceParams: map[string]connectorPlugin.Parameter{
"param": {
Type: connectorPlugin.ParameterTypeString,
Validations: []connectorPlugin.Validation{{
Type: connectorPlugin.ValidationTypeRequired,
}},
},
},
DestinationParams: map[string]connectorPlugin.Parameter{},
}
pls = append(pls, ps)
plsMap[name] = ps
}

cpoMock.EXPECT().
List(ctx).
Return(plsMap, nil).
Times(1)

want := &apiv1.ListConnectorPluginsResponse{
Plugins: []*apiv1.ConnectorPluginSpecifications{
toproto.ConnectorPluginSpecifications(pls[1].Name, pls[1]),
toproto.ConnectorPluginSpecifications(pls[2].Name, pls[2]),
},
}

got, err := api.ListConnectorPlugins(
ctx,
&apiv1.ListConnectorPluginsRequest{Name: "want-.*"},
)

is.NoErr(err)

sortPlugins := func(p []*apiv1.ConnectorPluginSpecifications) {
sort.Slice(p, func(i, j int) bool {
return p[i].Name < p[j].Name
})
}

sortPlugins(want.Plugins)
sortPlugins(got.Plugins)
is.Equal(want, got)
}

func sortConnectors(c []*apiv1.Connector) {
sort.Slice(c, func(i, j int) bool {
return c[i].Id < c[j].Id
Expand Down
Loading

0 comments on commit 86ff56e

Please sign in to comment.