From 203aeb78ca58a0267924a59026b8dceecf39f94c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Haris=20Osmanagi=C4=87?= Date: Thu, 18 Jul 2024 18:13:42 +0200 Subject: [PATCH] [Schemas] Limit schema service usage to connectors (#1701) --- go.mod | 6 +- go.sum | 12 +- pkg/conduit/runtime.go | 176 +++++++++++------- pkg/connector/instance.go | 4 +- pkg/connector/instance_test.go | 2 +- pkg/orchestrator/mock/orchestrator.go | 12 +- pkg/orchestrator/orchestrator.go | 2 +- pkg/orchestrator/orchestrator_test.go | 1 + pkg/pipeline/lifecycle.go | 2 +- pkg/pipeline/lifecycle_test.go | 2 +- pkg/plugin/connector/builtin/dispenser.go | 3 + .../connector/builtin/dispenser_test.go | 1 + pkg/plugin/connector/builtin/registry.go | 67 ++++--- pkg/plugin/connector/connutils/schema.go | 26 +++ pkg/plugin/connector/connutils/schema_test.go | 57 ++++++ pkg/plugin/connector/service.go | 55 ++++-- .../connector/standalone/dispenser_test.go | 10 +- pkg/plugin/connector/standalone/registry.go | 52 ++++-- .../connector/standalone/registry_test.go | 15 +- pkg/provisioning/interfaces.go | 2 +- pkg/provisioning/mock/provisioning.go | 12 +- pkg/provisioning/service_test.go | 1 + 22 files changed, 357 insertions(+), 163 deletions(-) create mode 100644 pkg/plugin/connector/connutils/schema_test.go diff --git a/go.mod b/go.mod index 1de45ac12..c3e92ede7 100644 --- a/go.mod +++ b/go.mod @@ -8,15 +8,15 @@ require ( github.com/Masterminds/sprig/v3 v3.2.3 github.com/NYTimes/gziphandler v1.1.1 github.com/bufbuild/buf v1.34.0 - github.com/conduitio/conduit-commons v0.2.1-0.20240708122218-5d1883981cfc + github.com/conduitio/conduit-commons v0.2.1-0.20240709142247-d973cba9694c github.com/conduitio/conduit-connector-file v0.6.1-0.20240621111422-221c138201d3 github.com/conduitio/conduit-connector-generator v0.6.1-0.20240621111436-e9fa3464f7b2 github.com/conduitio/conduit-connector-kafka v0.8.1-0.20240621111431-87c01cf39a06 github.com/conduitio/conduit-connector-log v0.3.1-0.20240621111440-e2f0f04a35a4 github.com/conduitio/conduit-connector-postgres v0.7.6-0.20240630172132-84b5a6e6104f - github.com/conduitio/conduit-connector-protocol v0.6.1-0.20240705154009-b938cfa7f251 + github.com/conduitio/conduit-connector-protocol v0.6.1-0.20240716144613-8ef04cf70e73 github.com/conduitio/conduit-connector-s3 v0.5.2-0.20240630172807-e278fde1fb46 - github.com/conduitio/conduit-connector-sdk v0.9.2-0.20240705162050-971c5f7facc2 + github.com/conduitio/conduit-connector-sdk v0.9.2-0.20240716152132-20953984628f github.com/conduitio/conduit-processor-sdk v0.1.2-0.20240516124003-442e4a3f0edd github.com/conduitio/conduit-schema-registry v0.0.0-20240705193355-7e2064b44e0d github.com/conduitio/yaml/v3 v3.3.0 diff --git a/go.sum b/go.sum index 749549565..1b31be39d 100644 --- a/go.sum +++ b/go.sum @@ -211,8 +211,8 @@ github.com/ckaznocha/intrange v0.1.2/go.mod h1:RWffCw/vKBwHeOEwWdCikAtY0q4gGt8Vh github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/colinmarc/hdfs/v2 v2.1.1/go.mod h1:M3x+k8UKKmxtFu++uAZ0OtDU8jR3jnaZIAc6yK4Ue0c= -github.com/conduitio/conduit-commons v0.2.1-0.20240708122218-5d1883981cfc h1:q2o3R/cXbQBUrJFBHV1YietwB275rBjkmPu1njzV4bk= -github.com/conduitio/conduit-commons v0.2.1-0.20240708122218-5d1883981cfc/go.mod h1:oF0KZc+TiVGBdKoMLLQ9Otb83Shq4CRZ2hClhNlFRio= +github.com/conduitio/conduit-commons v0.2.1-0.20240709142247-d973cba9694c h1:RK1K+aWz49ce72Y4xadv0dXj0qH2JeQBVbj0xEW/dfg= +github.com/conduitio/conduit-commons v0.2.1-0.20240709142247-d973cba9694c/go.mod h1:XdZNIIVGjm7mpPkE1VsuJGs+we5sc8CX1rle7jTfsOM= github.com/conduitio/conduit-connector-file v0.6.1-0.20240621111422-221c138201d3 h1:/mdy7vQzdfqDFLM13M39CYwI6Pk7xClMVZpGQW3+5DQ= github.com/conduitio/conduit-connector-file v0.6.1-0.20240621111422-221c138201d3/go.mod h1:bCnmA+29l871cNhroZfiCS2O8+GhBNVECfL5DOof2ew= github.com/conduitio/conduit-connector-generator v0.6.1-0.20240621111436-e9fa3464f7b2 h1:WMKvmvaE/E+03/0nz/2JpyelCd2nPtOTuBy3eyWcI58= @@ -223,12 +223,12 @@ github.com/conduitio/conduit-connector-log v0.3.1-0.20240621111440-e2f0f04a35a4 github.com/conduitio/conduit-connector-log v0.3.1-0.20240621111440-e2f0f04a35a4/go.mod h1:6IkveRPUPJDCtdH6vXOW1T+B8Vj99OA+szybqYSnlyY= github.com/conduitio/conduit-connector-postgres v0.7.6-0.20240630172132-84b5a6e6104f h1:p8CH8UlYkOSlqOREJtUW9eHm6fyn3M+5b0lUQByMVvg= github.com/conduitio/conduit-connector-postgres v0.7.6-0.20240630172132-84b5a6e6104f/go.mod h1:2v+hTwyTZFjM9evlMv6Id9M/rVuCZgzUnA3szRnWOiI= -github.com/conduitio/conduit-connector-protocol v0.6.1-0.20240705154009-b938cfa7f251 h1:X/dY6GJ8PxIDPgqpWO0bZqBoHrBUVA+8x//tO50PQMk= -github.com/conduitio/conduit-connector-protocol v0.6.1-0.20240705154009-b938cfa7f251/go.mod h1:LDGRw1uphxd0MNaF9NbLUbFwoJWS+GehsX4eQYau6f4= +github.com/conduitio/conduit-connector-protocol v0.6.1-0.20240716144613-8ef04cf70e73 h1:WnfXToyuoA6hEI+Elb3sVsm1DURW+7mdc3u7D1iFxiY= +github.com/conduitio/conduit-connector-protocol v0.6.1-0.20240716144613-8ef04cf70e73/go.mod h1:nN3W0Y+IA+CokBbqnNgOZNNc1yW4BJOQm47AsQa5nUM= github.com/conduitio/conduit-connector-s3 v0.5.2-0.20240630172807-e278fde1fb46 h1:tur/pSyX1RLzkxiBwhsV1qa6wP60pb20hJMptH5RRJY= github.com/conduitio/conduit-connector-s3 v0.5.2-0.20240630172807-e278fde1fb46/go.mod h1:m+pf2cMF+qCwhMj9gUBV1BPGLPYauhtYkj2zFddfvdE= -github.com/conduitio/conduit-connector-sdk v0.9.2-0.20240705162050-971c5f7facc2 h1:L7RrVxEn7qlAzDKzSO+bu0N371MwVlwx3SOdnCiYSfU= -github.com/conduitio/conduit-connector-sdk v0.9.2-0.20240705162050-971c5f7facc2/go.mod h1:8gTzxxOZ8tRf7XvWxIDviLLcOPa3oXlVueleYYTZzNs= +github.com/conduitio/conduit-connector-sdk v0.9.2-0.20240716152132-20953984628f h1:n9b+5+V0J2XG2xgC1MToG5t9a5CvGaHhsLQTFwBPmLQ= +github.com/conduitio/conduit-connector-sdk v0.9.2-0.20240716152132-20953984628f/go.mod h1:emiX/c7+CFn89lX+4mjycRyh1HPrU4AaVON2KWtP/i4= github.com/conduitio/conduit-processor-sdk v0.1.2-0.20240516124003-442e4a3f0edd h1:R+tpcZKWOnr6LRsXr85C167SK9MhaLhYUEjBSUupU9Y= github.com/conduitio/conduit-processor-sdk v0.1.2-0.20240516124003-442e4a3f0edd/go.mod h1:E9zqj0atY1+yBHWi4eZ3TagCZSBnFxBQBUcZktL6RFE= github.com/conduitio/conduit-schema-registry v0.0.0-20240705193355-7e2064b44e0d h1:C6wRzdyqdQQCL/lCruAsH0j1JoN2GEZBQdDHoxA2B0o= diff --git a/pkg/conduit/runtime.go b/pkg/conduit/runtime.go index 3839d6ccd..816c3d5fa 100644 --- a/pkg/conduit/runtime.go +++ b/pkg/conduit/runtime.go @@ -84,6 +84,8 @@ type Runtime struct { DB database.DB Orchestrator *orchestrator.Orchestrator ProvisionService *provisioning.Service + SchemaRegistry schemaregistry.Registry + // Ready will be closed when Runtime has successfully started Ready chan struct{} @@ -94,9 +96,7 @@ type Runtime struct { connectorPluginService *conn_plugin.PluginService processorPluginService *proc_plugin.PluginService - schemaRegistry schemaregistry.Registry - connSchemaService *connutils.SchemaService - + schemaService *connutils.SchemaService connectorPersister *connector.Persister logger log.CtxLogger @@ -154,7 +154,7 @@ func NewRuntime(cfg Config) (*Runtime, error) { logger: logger, } - err := initServices(r) + err := createServices(r) if err != nil { return nil, cerrors.Errorf("failed to initialize services: %w", err) } @@ -163,7 +163,7 @@ func NewRuntime(cfg Config) (*Runtime, error) { } // Create all necessary internal services -func initServices(r *Runtime) error { +func createServices(r *Runtime) error { standaloneReg, err := proc_standalone.NewRegistry(r.logger, r.Config.Processors.Path) if err != nil { return cerrors.Errorf("failed creating processor registry: %w", err) @@ -175,24 +175,19 @@ func initServices(r *Runtime) error { standaloneReg, ) - var schemaRegistry schemaregistry.Registry - switch r.Config.SchemaRegistry.Type { - case SchemaRegistryTypeConfluent: - schemaRegistry, err = schemaregistry.NewClient(r.logger, sr.URLs(r.Config.SchemaRegistry.Confluent.ConnectionString)) - if err != nil { - return cerrors.Errorf("failed to create schema registry client: %w", err) - } - case SchemaRegistryTypeBuiltin: - schemaRegistry = conduitschemaregistry.NewSchemaRegistry() - default: - // shouldn't happen, we validate the config - return cerrors.Errorf("invalid schema registry type %q", r.Config.SchemaRegistry.Type) + schemaRegistry, err := createSchemaRegistry(r.Config, r.logger) + if err != nil { + return cerrors.Errorf("failed to create schema registry: %w", err) } + schemaService := connutils.NewSchemaService(r.logger, schemaRegistry) - connSchemaService := connutils.NewSchemaService(r.logger, schemaRegistry) connPluginService := conn_plugin.NewPluginService( r.logger, - conn_builtin.NewRegistry(r.logger, r.Config.ConnectorPlugins, connSchemaService), + conn_builtin.NewRegistry( + r.logger, + r.Config.ConnectorPlugins, + schemaService, + ), conn_standalone.NewRegistry(r.logger, r.Config.Connectors.Path), ) @@ -206,17 +201,38 @@ func initServices(r *Runtime) error { r.Orchestrator = orc r.ProvisionService = provisionService + r.SchemaRegistry = schemaRegistry + r.pipelineService = plService r.connectorService = connService r.processorService = procService r.connectorPluginService = connPluginService r.processorPluginService = procPluginService - r.schemaRegistry = schemaRegistry - r.connSchemaService = connSchemaService + r.schemaService = schemaService return nil } +func createSchemaRegistry(config Config, logger log.CtxLogger) (schemaregistry.Registry, error) { + var schemaRegistry schemaregistry.Registry + var err error + + switch config.SchemaRegistry.Type { + case SchemaRegistryTypeConfluent: + schemaRegistry, err = schemaregistry.NewClient(logger, sr.URLs(config.SchemaRegistry.Confluent.ConnectionString)) + if err != nil { + return nil, cerrors.Errorf("failed to create schema registry client: %w", err) + } + case SchemaRegistryTypeBuiltin: + schemaRegistry = conduitschemaregistry.NewSchemaRegistry() + default: + // shouldn't happen, we validate the config + return nil, cerrors.Errorf("invalid schema registry type %q", config.SchemaRegistry.Type) + } + + return schemaRegistry, nil +} + func newGRPCStatsHandler() *promgrpc.StatsHandler { h := promgrpc.ServerStatsHandler() promclient.MustRegister(h) @@ -270,55 +286,10 @@ func (r *Runtime) Run(ctx context.Context) (err error) { // Register cleanup function that will run after tomb is killed r.registerCleanup(t) - // Init each service - err = r.processorService.Init(ctx) + // Initialize all services + err = r.initServices(ctx, t) if err != nil { - return cerrors.Errorf("failed to init processor service: %w", err) - } - err = r.connectorService.Init(ctx) - if err != nil { - return cerrors.Errorf("failed to init connector service: %w", err) - } - - if r.Config.Pipelines.ExitOnError { - r.pipelineService.OnFailure(func(e pipeline.FailureEvent) { - r.logger.Warn(ctx). - Err(e.Error). - Str(log.PipelineIDField, e.ID). - Msg("Conduit will shut down due to a pipeline failure and 'exit on error' enabled") - t.Kill(cerrors.Errorf("shut down due to 'exit on error' enabled: %w", e.Error)) - }) - } - err = r.pipelineService.Init(ctx) - if err != nil { - return cerrors.Errorf("failed to init pipeline service: %w", err) - } - - err = r.ProvisionService.Init(ctx) - if err != nil { - cerrors.ForEach(err, func(err error) { - r.logger.Err(ctx, err).Msg("provisioning failed") - }) - if r.Config.Pipelines.ExitOnError { - r.logger.Warn(ctx). - Err(err). - Msg("Conduit will shut down due to a pipeline provisioning failure and 'exit on error' enabled") - err = cerrors.Errorf("shut down due to 'exit on error' enabled: %w", err) - return err - } - } - - err = r.pipelineService.Run(ctx, r.connectorService, r.processorService, r.connectorPluginService) - if err != nil { - cerrors.ForEach(err, func(err error) { - r.logger.Err(ctx, err).Msg("pipeline failed to be started") - }) - } - - // APIs needed by connector plugins - _, err = r.startConnectorUtils(ctx, t) - if err != nil { - return cerrors.Errorf("failed to start connector utilities: %w", err) + return cerrors.Errorf("failed to initialize services: %w", err) } // Public gRPC and HTTP API @@ -504,7 +475,7 @@ func (r *Runtime) startConnectorUtils(ctx context.Context, t *tomb.Tomb) (net.Ad grpc.StatsHandler(r.gRPCStatsHandler), ) - schemaServiceAPI := pconduitserver.NewSchemaServiceServer(r.connSchemaService) + schemaServiceAPI := pconduitserver.NewSchemaServiceServer(r.schemaService) conduitv1.RegisterSchemaServiceServer(grpcServer, schemaServiceAPI) // Makes it easier to use command line tools to interact @@ -515,7 +486,7 @@ func (r *Runtime) startConnectorUtils(ctx context.Context, t *tomb.Tomb) (net.Ad // Names taken from schema.proto healthServer := api.NewHealthServer( map[string]api.Checker{ - "SchemaService": r.connSchemaService, + "SchemaService": r.schemaService, }, r.logger, ) @@ -747,3 +718,66 @@ func (r *Runtime) serveHTTP( return ln.Addr(), nil } + +func (r *Runtime) initServices(ctx context.Context, t *tomb.Tomb) error { + err := r.processorService.Init(ctx) + if err != nil { + return cerrors.Errorf("failed to init processor service: %w", err) + } + + token := r.schemaService.Token() + + // Initialize APIs needed by connector plugins + // Needs to be initialized before connectorPluginService + // because the standalone connector registry needs to run all plugins, + // and the plugins initialize a connector utils client when they are run. + connUtilsAddr, err := r.startConnectorUtils(ctx, t) + if err != nil { + return cerrors.Errorf("failed to start connector utilities API: %w", err) + } + r.logger.Info(ctx).Msgf("connector utilities started on %v", connUtilsAddr) + + r.connectorPluginService.Init(ctx, connUtilsAddr.String(), token) + + err = r.connectorService.Init(ctx) + if err != nil { + return cerrors.Errorf("failed to init connector service: %w", err) + } + + if r.Config.Pipelines.ExitOnError { + r.pipelineService.OnFailure(func(e pipeline.FailureEvent) { + r.logger.Warn(ctx). + Err(e.Error). + Str(log.PipelineIDField, e.ID). + Msg("Conduit will shut down due to a pipeline failure and 'exit on error' enabled") + t.Kill(cerrors.Errorf("shut down due to 'exit on error' enabled: %w", e.Error)) + }) + } + err = r.pipelineService.Init(ctx) + if err != nil { + return cerrors.Errorf("failed to init pipeline service: %w", err) + } + + err = r.ProvisionService.Init(ctx) + if err != nil { + cerrors.ForEach(err, func(err error) { + r.logger.Err(ctx, err).Msg("provisioning failed") + }) + if r.Config.Pipelines.ExitOnError { + r.logger.Warn(ctx). + Err(err). + Msg("Conduit will shut down due to a pipeline provisioning failure and 'exit on error' enabled") + err = cerrors.Errorf("shut down due to 'exit on error' enabled: %w", err) + return err + } + } + + err = r.pipelineService.Run(ctx, r.connectorService, r.processorService, r.connectorPluginService) + if err != nil { + cerrors.ForEach(err, func(err error) { + r.logger.Err(ctx, err).Msg("pipeline failed to be started") + }) + } + + return nil +} diff --git a/pkg/connector/instance.go b/pkg/connector/instance.go index 656e4442b..c5e905b8f 100644 --- a/pkg/connector/instance.go +++ b/pkg/connector/instance.go @@ -86,7 +86,7 @@ type Connector interface { // PluginDispenserFetcher can fetch a plugin dispenser. type PluginDispenserFetcher interface { - NewDispenser(logger log.CtxLogger, name string) (connectorPlugin.Dispenser, error) + NewDispenser(logger log.CtxLogger, name string, connectorID string) (connectorPlugin.Dispenser, error) } func (i *Instance) Init(logger log.CtxLogger, persister *Persister) { @@ -120,7 +120,7 @@ func (i *Instance) Connector(_ context.Context, dispenserFetcher PluginDispenser return nil, ErrConnectorRunning } - pluginDispenser, err := dispenserFetcher.NewDispenser(i.logger, i.Plugin) + pluginDispenser, err := dispenserFetcher.NewDispenser(i.logger, i.Plugin, i.ID) if err != nil { return nil, cerrors.Errorf("failed to get plugin dispenser: %w", err) } diff --git a/pkg/connector/instance_test.go b/pkg/connector/instance_test.go index aa755daae..918004f88 100644 --- a/pkg/connector/instance_test.go +++ b/pkg/connector/instance_test.go @@ -23,7 +23,7 @@ import ( // fakePluginFetcher fulfills the PluginFetcher interface. type fakePluginFetcher map[string]connectorPlugin.Dispenser -func (fpf fakePluginFetcher) NewDispenser(_ log.CtxLogger, name string) (connectorPlugin.Dispenser, error) { +func (fpf fakePluginFetcher) NewDispenser(_ log.CtxLogger, name string, _ string) (connectorPlugin.Dispenser, error) { plug, ok := fpf[name] if !ok { return nil, plugin.ErrPluginNotFound diff --git a/pkg/orchestrator/mock/orchestrator.go b/pkg/orchestrator/mock/orchestrator.go index 6b9e99642..25a5645f3 100644 --- a/pkg/orchestrator/mock/orchestrator.go +++ b/pkg/orchestrator/mock/orchestrator.go @@ -1122,18 +1122,18 @@ func (c *ConnectorPluginServiceListCall) DoAndReturn(f func(context.Context) (ma } // NewDispenser mocks base method. -func (m *ConnectorPluginService) NewDispenser(arg0 log.CtxLogger, arg1 string) (connector0.Dispenser, error) { +func (m *ConnectorPluginService) NewDispenser(arg0 log.CtxLogger, arg1, arg2 string) (connector0.Dispenser, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "NewDispenser", arg0, arg1) + ret := m.ctrl.Call(m, "NewDispenser", arg0, arg1, arg2) ret0, _ := ret[0].(connector0.Dispenser) ret1, _ := ret[1].(error) return ret0, ret1 } // NewDispenser indicates an expected call of NewDispenser. -func (mr *ConnectorPluginServiceMockRecorder) NewDispenser(arg0, arg1 any) *ConnectorPluginServiceNewDispenserCall { +func (mr *ConnectorPluginServiceMockRecorder) NewDispenser(arg0, arg1, arg2 any) *ConnectorPluginServiceNewDispenserCall { mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewDispenser", reflect.TypeOf((*ConnectorPluginService)(nil).NewDispenser), arg0, arg1) + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewDispenser", reflect.TypeOf((*ConnectorPluginService)(nil).NewDispenser), arg0, arg1, arg2) return &ConnectorPluginServiceNewDispenserCall{Call: call} } @@ -1149,13 +1149,13 @@ func (c *ConnectorPluginServiceNewDispenserCall) Return(arg0 connector0.Dispense } // Do rewrite *gomock.Call.Do -func (c *ConnectorPluginServiceNewDispenserCall) Do(f func(log.CtxLogger, string) (connector0.Dispenser, error)) *ConnectorPluginServiceNewDispenserCall { +func (c *ConnectorPluginServiceNewDispenserCall) Do(f func(log.CtxLogger, string, string) (connector0.Dispenser, error)) *ConnectorPluginServiceNewDispenserCall { c.Call = c.Call.Do(f) return c } // DoAndReturn rewrite *gomock.Call.DoAndReturn -func (c *ConnectorPluginServiceNewDispenserCall) DoAndReturn(f func(log.CtxLogger, string) (connector0.Dispenser, error)) *ConnectorPluginServiceNewDispenserCall { +func (c *ConnectorPluginServiceNewDispenserCall) DoAndReturn(f func(log.CtxLogger, string, string) (connector0.Dispenser, error)) *ConnectorPluginServiceNewDispenserCall { c.Call = c.Call.DoAndReturn(f) return c } diff --git a/pkg/orchestrator/orchestrator.go b/pkg/orchestrator/orchestrator.go index d85cdb0bc..ca4a7f2eb 100644 --- a/pkg/orchestrator/orchestrator.go +++ b/pkg/orchestrator/orchestrator.go @@ -122,7 +122,7 @@ type ProcessorService interface { type ConnectorPluginService interface { List(ctx context.Context) (map[string]pconnector.Specification, error) - NewDispenser(logger log.CtxLogger, name string) (connectorPlugin.Dispenser, error) + NewDispenser(logger log.CtxLogger, name string, connectorID string) (connectorPlugin.Dispenser, error) ValidateSourceConfig(ctx context.Context, name string, settings map[string]string) error ValidateDestinationConfig(ctx context.Context, name string, settings map[string]string) error } diff --git a/pkg/orchestrator/orchestrator_test.go b/pkg/orchestrator/orchestrator_test.go index b07cf09d9..383c2f567 100644 --- a/pkg/orchestrator/orchestrator_test.go +++ b/pkg/orchestrator/orchestrator_test.go @@ -77,6 +77,7 @@ func TestPipelineSimple(t *testing.T) { conn_builtin.NewRegistry(logger, conn_builtin.DefaultBuiltinConnectors, schemaService), conn_standalone.NewRegistry(logger, ""), ) + connPluginService.Init(ctx, "conn-utils-token:12345", "conn-utils-token") procPluginService := proc_plugin.NewPluginService( logger, diff --git a/pkg/pipeline/lifecycle.go b/pkg/pipeline/lifecycle.go index 716fd00a6..255b49002 100644 --- a/pkg/pipeline/lifecycle.go +++ b/pkg/pipeline/lifecycle.go @@ -48,7 +48,7 @@ type ProcessorService interface { // PluginDispenserFetcher can fetch a plugin. type PluginDispenserFetcher interface { - NewDispenser(logger log.CtxLogger, name string) (connectorPlugin.Dispenser, error) + NewDispenser(logger log.CtxLogger, name string, connectorID string) (connectorPlugin.Dispenser, error) } // Run runs pipelines that had the running state in store. diff --git a/pkg/pipeline/lifecycle_test.go b/pkg/pipeline/lifecycle_test.go index 8d659bd09..0441097c1 100644 --- a/pkg/pipeline/lifecycle_test.go +++ b/pkg/pipeline/lifecycle_test.go @@ -557,7 +557,7 @@ func (tpf testProcessorFetcher) Get(_ context.Context, id string) (*processor.In // testPluginFetcher fulfills the PluginFetcher interface. type testPluginFetcher map[string]connectorPlugin.Dispenser -func (tpf testPluginFetcher) NewDispenser(_ log.CtxLogger, name string) (connectorPlugin.Dispenser, error) { +func (tpf testPluginFetcher) NewDispenser(_ log.CtxLogger, name string, _ string) (connectorPlugin.Dispenser, error) { plug, ok := tpf[name] if !ok { return nil, plugin.ErrPluginNotFound diff --git a/pkg/plugin/connector/builtin/dispenser.go b/pkg/plugin/connector/builtin/dispenser.go index 89f8be6aa..d30ee5391 100644 --- a/pkg/plugin/connector/builtin/dispenser.go +++ b/pkg/plugin/connector/builtin/dispenser.go @@ -23,6 +23,7 @@ import ( type Dispenser struct { name plugin.FullName + connectorID string logger log.CtxLogger specifierPlugin func() pconnector.SpecifierPlugin sourcePlugin func() pconnector.SourcePlugin @@ -31,6 +32,7 @@ type Dispenser struct { func NewDispenser( name plugin.FullName, + connectorID string, logger log.CtxLogger, specifierPlugin func() pconnector.SpecifierPlugin, sourcePlugin func() pconnector.SourcePlugin, @@ -38,6 +40,7 @@ func NewDispenser( ) *Dispenser { return &Dispenser{ name: name, + connectorID: connectorID, logger: logger, specifierPlugin: specifierPlugin, sourcePlugin: sourcePlugin, diff --git a/pkg/plugin/connector/builtin/dispenser_test.go b/pkg/plugin/connector/builtin/dispenser_test.go index 49111663e..4f5d7392d 100644 --- a/pkg/plugin/connector/builtin/dispenser_test.go +++ b/pkg/plugin/connector/builtin/dispenser_test.go @@ -35,6 +35,7 @@ func newTestDispenser(t *testing.T) (connector.Dispenser, *mock.SpecifierPlugin, dispenser := NewDispenser( "mockPlugin", + "mock-connector-id", logger, func() pconnector.SpecifierPlugin { return mockSpecifier }, func() pconnector.SourcePlugin { return mockSource }, diff --git a/pkg/plugin/connector/builtin/registry.go b/pkg/plugin/connector/builtin/registry.go index 13584bcfa..d94fa51d0 100644 --- a/pkg/plugin/connector/builtin/registry.go +++ b/pkg/plugin/connector/builtin/registry.go @@ -51,9 +51,11 @@ var ( type Registry struct { logger log.CtxLogger + connectors map[string]sdk.Connector // plugins stores plugin blueprints in a 2D map, first key is the plugin // name, the second key is the plugin version plugins map[string]map[string]blueprint + service *connutils.SchemaService } type blueprint struct { @@ -62,9 +64,9 @@ type blueprint struct { dispenserFactory dispenserFactory } -type dispenserFactory func(name plugin.FullName, logger log.CtxLogger) connector.Dispenser +type dispenserFactory func(name plugin.FullName, connectorID string, logger log.CtxLogger) connector.Dispenser -func newDispenserFactory(conn sdk.Connector) dispenserFactory { +func newDispenserFactory(conn sdk.Connector, token string) dispenserFactory { if conn.NewSource == nil { conn.NewSource = func() sdk.Source { return nil } } @@ -72,43 +74,64 @@ func newDispenserFactory(conn sdk.Connector) dispenserFactory { conn.NewDestination = func() sdk.Destination { return nil } } - return func(name plugin.FullName, logger log.CtxLogger) connector.Dispenser { + cfg := pconnector.PluginConfig{ + Token: token, + } + + return func(name plugin.FullName, connectorID string, logger log.CtxLogger) connector.Dispenser { return NewDispenser( name, + connectorID, logger, func() pconnector.SpecifierPlugin { return sdk.NewSpecifierPlugin(conn.NewSpecification(), conn.NewSource(), conn.NewDestination()) }, - func() pconnector.SourcePlugin { return sdk.NewSourcePlugin(conn.NewSource()) }, - func() pconnector.DestinationPlugin { return sdk.NewDestinationPlugin(conn.NewDestination()) }, + func() pconnector.SourcePlugin { + // TODO add connector ID to cfg + // TODO generate token (based on the connector ID) and add token to cfg + // TODO get log level from logger/config + return sdk.NewSourcePlugin(conn.NewSource(), cfg) + }, + func() pconnector.DestinationPlugin { + // TODO add connector ID to cfg + // TODO generate token (based on the connector ID) and add token to cfg + // TODO get log level from logger/config + return sdk.NewDestinationPlugin(conn.NewDestination(), cfg) + }, ) } } func NewRegistry(logger log.CtxLogger, connectors map[string]sdk.Connector, service *connutils.SchemaService) *Registry { logger = logger.WithComponentFromType(Registry{}) - buildInfo, ok := debug.ReadBuildInfo() - if !ok { - // we are using modules, build info should always be available, we are staying on the safe side - logger.Warn(context.Background()).Msg("build info not available, built-in plugin versions may not be read correctly") - buildInfo = &debug.BuildInfo{} // prevent nil pointer exceptions - } - // The built-in plugins use Conduit's own schema service schema.Service = service r := &Registry{ - plugins: loadPlugins(buildInfo, connectors), - logger: logger, + logger: logger, + connectors: connectors, + service: service, } - logger.Info(context.Background()).Int("count", len(r.List())).Msg("builtin connector plugins initialized") + return r } -func loadPlugins(buildInfo *debug.BuildInfo, connectors map[string]sdk.Connector) map[string]map[string]blueprint { - plugins := make(map[string]map[string]blueprint, len(connectors)) - for moduleName, conn := range connectors { - factory := newDispenserFactory(conn) +func (r *Registry) Init(ctx context.Context) { + buildInfo, ok := debug.ReadBuildInfo() + if !ok { + // we are using modules, build info should always be available, we are staying on the safe side + r.logger.Warn(ctx).Msg("build info not available, built-in plugin versions may not be read correctly") + buildInfo = &debug.BuildInfo{} // prevent nil pointer exceptions + } + + r.plugins = r.loadPlugins(buildInfo) + r.logger.Info(ctx).Int("count", len(r.List())).Msg("builtin connector plugins initialized") +} + +func (r *Registry) loadPlugins(buildInfo *debug.BuildInfo) map[string]map[string]blueprint { + plugins := make(map[string]map[string]blueprint, len(r.connectors)) + for moduleName, conn := range r.connectors { + factory := newDispenserFactory(conn, r.service.Token()) specs, err := getSpecification(moduleName, factory, buildInfo) if err != nil { @@ -143,7 +166,7 @@ func loadPlugins(buildInfo *debug.BuildInfo, connectors map[string]sdk.Connector } func getSpecification(moduleName string, factory dispenserFactory, buildInfo *debug.BuildInfo) (pconnector.Specification, error) { - dispenser := factory("", log.CtxLogger{}) + dispenser := factory("", "", log.CtxLogger{}) specPlugin, err := dispenser.DispenseSpecifier() if err != nil { return pconnector.Specification{}, cerrors.Errorf("could not dispense specifier for built in plugin: %w", err) @@ -177,7 +200,7 @@ func newFullName(pluginName, pluginVersion string) plugin.FullName { return plugin.NewFullName(plugin.PluginTypeBuiltin, pluginName, pluginVersion) } -func (r *Registry) NewDispenser(logger log.CtxLogger, fullName plugin.FullName) (connector.Dispenser, error) { +func (r *Registry) NewDispenser(logger log.CtxLogger, fullName plugin.FullName, connectorID string) (connector.Dispenser, error) { versionMap, ok := r.plugins[fullName.PluginName()] if !ok { return nil, plugin.ErrPluginNotFound @@ -191,7 +214,7 @@ func (r *Registry) NewDispenser(logger log.CtxLogger, fullName plugin.FullName) return nil, cerrors.Errorf("could not find builtin plugin %q, only found versions %v: %w", fullName, availableVersions, plugin.ErrPluginNotFound) } - return b.dispenserFactory(fullName, logger), nil + return b.dispenserFactory(fullName, connectorID, logger), nil } func (r *Registry) List() map[plugin.FullName]pconnector.Specification { diff --git a/pkg/plugin/connector/connutils/schema.go b/pkg/plugin/connector/connutils/schema.go index ea9142a79..727932953 100644 --- a/pkg/plugin/connector/connutils/schema.go +++ b/pkg/plugin/connector/connutils/schema.go @@ -24,12 +24,14 @@ import ( "github.com/conduitio/conduit/pkg/schemaregistry" "github.com/conduitio/conduit/pkg/schemaregistry/fromschema" "github.com/conduitio/conduit/pkg/schemaregistry/toschema" + "github.com/google/uuid" "github.com/twmb/franz-go/pkg/sr" ) type SchemaService struct { registry schemaregistry.Registry logger log.CtxLogger + token string } var _ pconduit.SchemaService = (*SchemaService)(nil) @@ -38,6 +40,7 @@ func NewSchemaService(logger log.CtxLogger, registry schemaregistry.Registry) *S return &SchemaService{ registry: registry, logger: logger.WithComponent("connutils.SchemaService"), + token: uuid.NewString(), } } @@ -50,6 +53,11 @@ func (s *SchemaService) Check(ctx context.Context) error { } func (s *SchemaService) CreateSchema(ctx context.Context, req pconduit.CreateSchemaRequest) (pconduit.CreateSchemaResponse, error) { + err := s.validateToken(ctx) + if err != nil { + return pconduit.CreateSchemaResponse{}, err + } + ss, err := s.registry.CreateSchema(ctx, req.Subject, sr.Schema{ Schema: string(req.Bytes), Type: fromschema.SrSchemaType(req.Type), @@ -67,6 +75,11 @@ func (s *SchemaService) CreateSchema(ctx context.Context, req pconduit.CreateSch } func (s *SchemaService) GetSchema(ctx context.Context, req pconduit.GetSchemaRequest) (pconduit.GetSchemaResponse, error) { + err := s.validateToken(ctx) + if err != nil { + return pconduit.GetSchemaResponse{}, err + } + ss, err := s.registry.SchemaBySubjectVersion(ctx, req.Subject, req.Version) if err != nil { var respErr *sr.ResponseError @@ -81,6 +94,19 @@ func (s *SchemaService) GetSchema(ctx context.Context, req pconduit.GetSchemaReq }, nil } +func (s *SchemaService) Token() string { + return s.token +} + +func (s *SchemaService) validateToken(ctx context.Context) error { + token := pconduit.ConnectorTokenFromContext(ctx) + if token != s.Token() { + return cerrors.Errorf("token %q is invalid", token) + } + + return nil +} + func unwrapSrError(e *sr.ResponseError) error { switch e.ErrorCode { case conduitschemaregistry.ErrorCodeSubjectNotFound, diff --git a/pkg/plugin/connector/connutils/schema_test.go b/pkg/plugin/connector/connutils/schema_test.go new file mode 100644 index 000000000..494af0d52 --- /dev/null +++ b/pkg/plugin/connector/connutils/schema_test.go @@ -0,0 +1,57 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package connutils + +import ( + "context" + "testing" + + "github.com/conduitio/conduit-connector-protocol/pconduit" + conduitschemaregistry "github.com/conduitio/conduit-schema-registry" + "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/matryer/is" +) + +func TestSchemaService_ValidateToken(t *testing.T) { + testCases := []struct { + name string + token string + wantErr string + }{ + { + name: "no token", + token: "", + wantErr: "token \"\" is invalid", + }, + { + name: "invalid token", + token: "abc", + wantErr: "token \"abc\" is invalid", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + is := is.New(t) + ctx := pconduit.ContextWithConnectorToken(context.Background(), tc.token) + + underTest := NewSchemaService(log.Nop(), conduitschemaregistry.NewSchemaRegistry()) + _, err := underTest.CreateSchema(ctx, pconduit.CreateSchemaRequest{}) + + is.True(err != nil) + is.Equal(err.Error(), tc.wantErr) + }) + } +} diff --git a/pkg/plugin/connector/service.go b/pkg/plugin/connector/service.go index 8528c4873..f37caf1dd 100644 --- a/pkg/plugin/connector/service.go +++ b/pkg/plugin/connector/service.go @@ -26,29 +26,41 @@ import ( // registry is an object that can create new plugin dispensers. We need to use // an interface to prevent a cyclic dependency between the plugin package and // builtin and standalone packages. -// There are two registries that implement this interface: -// - The built-in registry creates a dispenser which dispenses a plugin adapter -// that communicates with the plugin directly as if it was a library. These -// plugins are baked into the Conduit binary and included at compile time. -// - The standalone registry creates a dispenser which starts the plugin in a -// separate process and communicates with it via gRPC. These plugins are -// compiled independently of Conduit and can be included at runtime. +// There are two registries that implement this interface: builtinReg and standaloneReg type registry interface { - NewDispenser(logger log.CtxLogger, name plugin.FullName) (Dispenser, error) + NewDispenser(logger log.CtxLogger, name plugin.FullName, connectorID string) (Dispenser, error) List() map[plugin.FullName]pconnector.Specification } +// The built-in registry creates a dispenser which dispenses a plugin adapter +// that communicates with the plugin directly as if it was a library. These +// plugins are baked into the Conduit binary and included at compile time. +type builtinReg interface { + registry + + Init(context.Context) +} + +// standaloneReg creates a dispenser which starts the plugin in a +// separate process and communicates with it via gRPC. These plugins are +// compiled independently of Conduit and can be included at runtime. +type standaloneReg interface { + registry + + Init(ctx context.Context, connUtilsAddr string, connUtilsToken string) +} + type PluginService struct { logger log.CtxLogger - builtinReg registry - standaloneReg registry + builtinReg builtinReg + standaloneReg standaloneReg } func NewPluginService( logger log.CtxLogger, - builtin registry, - standalone registry, + builtin builtinReg, + standalone standaloneReg, ) *PluginService { return &PluginService{ logger: logger.WithComponent("connector.PluginService"), @@ -57,24 +69,29 @@ func NewPluginService( } } +func (s *PluginService) Init(ctx context.Context, connUtilsAddr string, connUtilsToken string) { + s.builtinReg.Init(ctx) + s.standaloneReg.Init(ctx, connUtilsAddr, connUtilsToken) +} + func (s *PluginService) Check(context.Context) error { return nil } -func (s *PluginService) NewDispenser(logger log.CtxLogger, name string) (Dispenser, error) { +func (s *PluginService) NewDispenser(logger log.CtxLogger, name string, connectorID string) (Dispenser, error) { logger = logger.WithComponent("plugin") fullName := plugin.FullName(name) switch fullName.PluginType() { case plugin.PluginTypeStandalone: - return s.standaloneReg.NewDispenser(logger, fullName) + return s.standaloneReg.NewDispenser(logger, fullName, connectorID) case plugin.PluginTypeBuiltin: - return s.builtinReg.NewDispenser(logger, fullName) + return s.builtinReg.NewDispenser(logger, fullName, connectorID) case plugin.PluginTypeAny: - d, err := s.standaloneReg.NewDispenser(logger, fullName) + d, err := s.standaloneReg.NewDispenser(logger, fullName, connectorID) if err != nil { s.logger.Debug(context.Background()).Err(err).Msg("could not find standalone plugin dispenser, falling back to builtin plugin") - d, err = s.builtinReg.NewDispenser(logger, fullName) + d, err = s.builtinReg.NewDispenser(logger, fullName, connectorID) } return d, err default: @@ -98,7 +115,7 @@ func (s *PluginService) List(context.Context) (map[string]pconnector.Specificati } func (s *PluginService) ValidateSourceConfig(ctx context.Context, name string, settings map[string]string) (err error) { - d, err := s.NewDispenser(s.logger, name) + d, err := s.NewDispenser(s.logger, name, "validate-source-config") if err != nil { return cerrors.Errorf("couldn't get dispenser: %w", err) } @@ -124,7 +141,7 @@ func (s *PluginService) ValidateSourceConfig(ctx context.Context, name string, s } func (s *PluginService) ValidateDestinationConfig(ctx context.Context, name string, settings map[string]string) (err error) { - d, err := s.NewDispenser(s.logger, name) + d, err := s.NewDispenser(s.logger, name, "validate-destination-config") if err != nil { return cerrors.Errorf("couldn't get dispenser: %w", err) } diff --git a/pkg/plugin/connector/standalone/dispenser_test.go b/pkg/plugin/connector/standalone/dispenser_test.go index ef2f6d590..fe3245f33 100644 --- a/pkg/plugin/connector/standalone/dispenser_test.go +++ b/pkg/plugin/connector/standalone/dispenser_test.go @@ -16,7 +16,6 @@ package standalone import ( "context" - "os" "strconv" "testing" "time" @@ -54,7 +53,7 @@ func newTestDispenser(t *testing.T, logger zerolog.Logger, version int) ( go func() { defer close(closeCh) // Trick to convince the plugin it should use a specific protocol version - os.Setenv("PLUGIN_PROTOCOL_VERSIONS", strconv.Itoa(version)) + t.Setenv("PLUGIN_PROTOCOL_VERSIONS", strconv.Itoa(version)) err := server.Serve( func() pconnector.SpecifierPlugin { return mockSpecifier }, func() pconnector.SourcePlugin { return mockSource }, @@ -78,7 +77,12 @@ func newTestDispenser(t *testing.T, logger zerolog.Logger, version int) ( } // Connect - dispenser, err := NewDispenser(logger, "", client.WithReattachConfig(config), reattachVersionedPluginOption{version: version}) + dispenser, err := NewDispenser( + logger, + "", + client.WithReattachConfig(config), + reattachVersionedPluginOption{version: version}, + ) if err != nil { t.Fatal("could not create dispenser", err) } diff --git a/pkg/plugin/connector/standalone/registry.go b/pkg/plugin/connector/standalone/registry.go index 6a9467246..862513424 100644 --- a/pkg/plugin/connector/standalone/registry.go +++ b/pkg/plugin/connector/standalone/registry.go @@ -21,7 +21,9 @@ import ( "path/filepath" "sync" + "github.com/conduitio/conduit-connector-protocol/pconduit" "github.com/conduitio/conduit-connector-protocol/pconnector" + "github.com/conduitio/conduit-connector-protocol/pconnector/client" "github.com/conduitio/conduit/pkg/foundation/cerrors" "github.com/conduitio/conduit/pkg/foundation/log" "github.com/conduitio/conduit/pkg/plugin" @@ -30,8 +32,10 @@ import ( ) type Registry struct { - logger log.CtxLogger - pluginDir string + logger log.CtxLogger + pluginDir string + connUtilsAddr string + connUtilsToken string // plugins stores plugin blueprints in a 2D map, first key is the plugin // name, the second key is the plugin version @@ -60,30 +64,32 @@ func NewRegistry(logger log.CtxLogger, pluginDir string) *Registry { r.logger.Warn(context.Background()).Err(err).Msg("could not extract absolute connector plugins path") } else { r.pluginDir = absPluginDir // store plugin dir for hot reloads - r.reloadPlugins() } } - r.logger.Info(context.Background()). - Str(log.PluginPathField, r.pluginDir). - Int("count", len(r.List())). - Msg("standalone connector plugins initialized") - return r } -func (r *Registry) reloadPlugins() { - plugins := r.loadPlugins(context.Background(), r.pluginDir) +func (r *Registry) Init(ctx context.Context, connUtilsAddr string, connUtilsToken string) { + r.connUtilsAddr = connUtilsAddr + r.connUtilsToken = connUtilsToken + + plugins := r.loadPlugins(ctx) r.m.Lock() r.plugins = plugins r.m.Unlock() + + r.logger.Info(ctx). + Str(log.PluginPathField, r.pluginDir). + Int("count", len(r.List())). + Msg("standalone connector plugins initialized") } -func (r *Registry) loadPlugins(ctx context.Context, pluginDir string) map[string]map[string]blueprint { - r.logger.Debug(ctx).Msgf("loading connector plugins from directory %v", pluginDir) +func (r *Registry) loadPlugins(ctx context.Context) map[string]map[string]blueprint { + r.logger.Debug(ctx).Msgf("loading connector plugins from directory %v", r.pluginDir) plugins := make(map[string]map[string]blueprint) - dirEntries, err := os.ReadDir(pluginDir) + dirEntries, err := os.ReadDir(r.pluginDir) if err != nil { r.logger.Warn(ctx).Err(err).Msg("could not read connector plugin directory") return plugins // return empty map @@ -101,7 +107,7 @@ func (r *Registry) loadPlugins(ctx context.Context, pluginDir string) map[string continue } - pluginPath := path.Join(pluginDir, dirEntry.Name()) + pluginPath := path.Join(r.pluginDir, dirEntry.Name()) specs, err := r.loadSpecifications(pluginPath) if err != nil { @@ -157,7 +163,13 @@ func (r *Registry) loadPlugins(ctx context.Context, pluginDir string) map[string func (r *Registry) loadSpecifications(pluginPath string) (pconnector.Specification, error) { // create dispenser without a logger to not spam logs on refresh - dispenser, err := NewDispenser(zerolog.Nop(), pluginPath) + dispenser, err := NewDispenser( + zerolog.Nop(), + pluginPath, + client.WithEnvVar(pconduit.EnvConduitConnectorUtilitiesGRPCTarget, r.connUtilsAddr), + client.WithEnvVar(pconduit.EnvConduitConnectorSchemaToken, r.connUtilsToken), + client.WithEnvVar(pconduit.EnvConduitConnectorID, "load-specifications"), + ) if err != nil { return pconnector.Specification{}, cerrors.Errorf("failed to create connector dispenser: %w", err) } @@ -175,7 +187,7 @@ func (r *Registry) loadSpecifications(pluginPath string) (pconnector.Specificati return resp.Specification, nil } -func (r *Registry) NewDispenser(logger log.CtxLogger, fullName plugin.FullName) (connector.Dispenser, error) { +func (r *Registry) NewDispenser(logger log.CtxLogger, fullName plugin.FullName, connectorID string) (connector.Dispenser, error) { r.m.RLock() defer r.m.RUnlock() @@ -193,7 +205,13 @@ func (r *Registry) NewDispenser(logger log.CtxLogger, fullName plugin.FullName) } logger = logger.WithComponent("plugin.standalone") - return NewDispenser(logger.ZerologWithComponent(), bp.Path) + return NewDispenser( + logger.ZerologWithComponent(), + bp.Path, + client.WithEnvVar(pconduit.EnvConduitConnectorUtilitiesGRPCTarget, r.connUtilsAddr), + client.WithEnvVar(pconduit.EnvConduitConnectorSchemaToken, r.connUtilsToken), + client.WithEnvVar(pconduit.EnvConduitConnectorID, connectorID), + ) } func (r *Registry) List() map[plugin.FullName]pconnector.Specification { diff --git a/pkg/plugin/connector/standalone/registry_test.go b/pkg/plugin/connector/standalone/registry_test.go index 3d5ea5fc4..88cfbd601 100644 --- a/pkg/plugin/connector/standalone/registry_test.go +++ b/pkg/plugin/connector/standalone/registry_test.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "path" + "path/filepath" "regexp" "testing" @@ -54,9 +55,14 @@ const ( ) func testPluginBlueprint() blueprint { + pluginPath, err := filepath.Abs(path.Join(testPluginDir, "testplugin.sh")) + if err != nil { + panic(err) + } + return blueprint{ FullName: plugin.FullName(fmt.Sprintf("standalone:%v@%v", testPluginName, testPluginVersion)), - Path: path.Join(testPluginDir, "testplugin.sh"), + Path: pluginPath, Specification: pconnector.Specification{ Name: testPluginName, Summary: testPluginSummary, @@ -106,9 +112,10 @@ func testPluginBlueprint() blueprint { func TestRegistry_loadPlugins(t *testing.T) { is := is.New(t) + ctx := context.Background() - r := NewRegistry(log.Test(t), "") - got := r.loadPlugins(context.Background(), testPluginDir) + r := NewRegistry(log.Test(t), testPluginDir) + got := r.loadPlugins(ctx) want := map[string]map[string]blueprint{ testPluginName: { testPluginVersion: testPluginBlueprint(), @@ -122,8 +129,10 @@ func TestRegistry_loadPlugins(t *testing.T) { func TestRegistry_List(t *testing.T) { is := is.New(t) + ctx := context.Background() r := NewRegistry(log.Nop(), testPluginDir) + r.Init(ctx, ":12345", "irrelevant-token") got := r.List() bp := testPluginBlueprint() diff --git a/pkg/provisioning/interfaces.go b/pkg/provisioning/interfaces.go index 218e0cf11..d4e8e6f72 100644 --- a/pkg/provisioning/interfaces.go +++ b/pkg/provisioning/interfaces.go @@ -70,5 +70,5 @@ type ProcessorService interface { } type ConnectorPluginService interface { - NewDispenser(ctx log.CtxLogger, name string) (connectorPlugin.Dispenser, error) + NewDispenser(ctx log.CtxLogger, name string, connectorID string) (connectorPlugin.Dispenser, error) } diff --git a/pkg/provisioning/mock/provisioning.go b/pkg/provisioning/mock/provisioning.go index fdd8b9bac..e7c8ce85c 100644 --- a/pkg/provisioning/mock/provisioning.go +++ b/pkg/provisioning/mock/provisioning.go @@ -1005,18 +1005,18 @@ func (m *ConnectorPluginService) EXPECT() *ConnectorPluginServiceMockRecorder { } // NewDispenser mocks base method. -func (m *ConnectorPluginService) NewDispenser(arg0 log.CtxLogger, arg1 string) (connector0.Dispenser, error) { +func (m *ConnectorPluginService) NewDispenser(arg0 log.CtxLogger, arg1, arg2 string) (connector0.Dispenser, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "NewDispenser", arg0, arg1) + ret := m.ctrl.Call(m, "NewDispenser", arg0, arg1, arg2) ret0, _ := ret[0].(connector0.Dispenser) ret1, _ := ret[1].(error) return ret0, ret1 } // NewDispenser indicates an expected call of NewDispenser. -func (mr *ConnectorPluginServiceMockRecorder) NewDispenser(arg0, arg1 any) *ConnectorPluginServiceNewDispenserCall { +func (mr *ConnectorPluginServiceMockRecorder) NewDispenser(arg0, arg1, arg2 any) *ConnectorPluginServiceNewDispenserCall { mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewDispenser", reflect.TypeOf((*ConnectorPluginService)(nil).NewDispenser), arg0, arg1) + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewDispenser", reflect.TypeOf((*ConnectorPluginService)(nil).NewDispenser), arg0, arg1, arg2) return &ConnectorPluginServiceNewDispenserCall{Call: call} } @@ -1032,13 +1032,13 @@ func (c *ConnectorPluginServiceNewDispenserCall) Return(arg0 connector0.Dispense } // Do rewrite *gomock.Call.Do -func (c *ConnectorPluginServiceNewDispenserCall) Do(f func(log.CtxLogger, string) (connector0.Dispenser, error)) *ConnectorPluginServiceNewDispenserCall { +func (c *ConnectorPluginServiceNewDispenserCall) Do(f func(log.CtxLogger, string, string) (connector0.Dispenser, error)) *ConnectorPluginServiceNewDispenserCall { c.Call = c.Call.Do(f) return c } // DoAndReturn rewrite *gomock.Call.DoAndReturn -func (c *ConnectorPluginServiceNewDispenserCall) DoAndReturn(f func(log.CtxLogger, string) (connector0.Dispenser, error)) *ConnectorPluginServiceNewDispenserCall { +func (c *ConnectorPluginServiceNewDispenserCall) DoAndReturn(f func(log.CtxLogger, string, string) (connector0.Dispenser, error)) *ConnectorPluginServiceNewDispenserCall { c.Call = c.Call.DoAndReturn(f) return c } diff --git a/pkg/provisioning/service_test.go b/pkg/provisioning/service_test.go index 8ea502eab..5a73f9063 100644 --- a/pkg/provisioning/service_test.go +++ b/pkg/provisioning/service_test.go @@ -493,6 +493,7 @@ func TestService_IntegrationTestServices(t *testing.T) { builtin.NewRegistry(logger, builtin.DefaultBuiltinConnectors, schemaService), standalone.NewRegistry(logger, ""), ) + connPluginService.Init(ctx, "conn-utils-token:12345", "conn-utils-token") procPluginService := proc_plugin.NewPluginService( logger,