Skip to content

Commit

Permalink
[Schemas] Limit schema service usage to connectors (#1701)
Browse files Browse the repository at this point in the history
  • Loading branch information
hariso authored Jul 18, 2024
1 parent 3c5362f commit 203aeb7
Show file tree
Hide file tree
Showing 22 changed files with 357 additions and 163 deletions.
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ require (
github.com/Masterminds/sprig/v3 v3.2.3
github.com/NYTimes/gziphandler v1.1.1
github.com/bufbuild/buf v1.34.0
github.com/conduitio/conduit-commons v0.2.1-0.20240708122218-5d1883981cfc
github.com/conduitio/conduit-commons v0.2.1-0.20240709142247-d973cba9694c
github.com/conduitio/conduit-connector-file v0.6.1-0.20240621111422-221c138201d3
github.com/conduitio/conduit-connector-generator v0.6.1-0.20240621111436-e9fa3464f7b2
github.com/conduitio/conduit-connector-kafka v0.8.1-0.20240621111431-87c01cf39a06
github.com/conduitio/conduit-connector-log v0.3.1-0.20240621111440-e2f0f04a35a4
github.com/conduitio/conduit-connector-postgres v0.7.6-0.20240630172132-84b5a6e6104f
github.com/conduitio/conduit-connector-protocol v0.6.1-0.20240705154009-b938cfa7f251
github.com/conduitio/conduit-connector-protocol v0.6.1-0.20240716144613-8ef04cf70e73
github.com/conduitio/conduit-connector-s3 v0.5.2-0.20240630172807-e278fde1fb46
github.com/conduitio/conduit-connector-sdk v0.9.2-0.20240705162050-971c5f7facc2
github.com/conduitio/conduit-connector-sdk v0.9.2-0.20240716152132-20953984628f
github.com/conduitio/conduit-processor-sdk v0.1.2-0.20240516124003-442e4a3f0edd
github.com/conduitio/conduit-schema-registry v0.0.0-20240705193355-7e2064b44e0d
github.com/conduitio/yaml/v3 v3.3.0
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,8 @@ github.com/ckaznocha/intrange v0.1.2/go.mod h1:RWffCw/vKBwHeOEwWdCikAtY0q4gGt8Vh
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/colinmarc/hdfs/v2 v2.1.1/go.mod h1:M3x+k8UKKmxtFu++uAZ0OtDU8jR3jnaZIAc6yK4Ue0c=
github.com/conduitio/conduit-commons v0.2.1-0.20240708122218-5d1883981cfc h1:q2o3R/cXbQBUrJFBHV1YietwB275rBjkmPu1njzV4bk=
github.com/conduitio/conduit-commons v0.2.1-0.20240708122218-5d1883981cfc/go.mod h1:oF0KZc+TiVGBdKoMLLQ9Otb83Shq4CRZ2hClhNlFRio=
github.com/conduitio/conduit-commons v0.2.1-0.20240709142247-d973cba9694c h1:RK1K+aWz49ce72Y4xadv0dXj0qH2JeQBVbj0xEW/dfg=
github.com/conduitio/conduit-commons v0.2.1-0.20240709142247-d973cba9694c/go.mod h1:XdZNIIVGjm7mpPkE1VsuJGs+we5sc8CX1rle7jTfsOM=
github.com/conduitio/conduit-connector-file v0.6.1-0.20240621111422-221c138201d3 h1:/mdy7vQzdfqDFLM13M39CYwI6Pk7xClMVZpGQW3+5DQ=
github.com/conduitio/conduit-connector-file v0.6.1-0.20240621111422-221c138201d3/go.mod h1:bCnmA+29l871cNhroZfiCS2O8+GhBNVECfL5DOof2ew=
github.com/conduitio/conduit-connector-generator v0.6.1-0.20240621111436-e9fa3464f7b2 h1:WMKvmvaE/E+03/0nz/2JpyelCd2nPtOTuBy3eyWcI58=
Expand All @@ -223,12 +223,12 @@ github.com/conduitio/conduit-connector-log v0.3.1-0.20240621111440-e2f0f04a35a4
github.com/conduitio/conduit-connector-log v0.3.1-0.20240621111440-e2f0f04a35a4/go.mod h1:6IkveRPUPJDCtdH6vXOW1T+B8Vj99OA+szybqYSnlyY=
github.com/conduitio/conduit-connector-postgres v0.7.6-0.20240630172132-84b5a6e6104f h1:p8CH8UlYkOSlqOREJtUW9eHm6fyn3M+5b0lUQByMVvg=
github.com/conduitio/conduit-connector-postgres v0.7.6-0.20240630172132-84b5a6e6104f/go.mod h1:2v+hTwyTZFjM9evlMv6Id9M/rVuCZgzUnA3szRnWOiI=
github.com/conduitio/conduit-connector-protocol v0.6.1-0.20240705154009-b938cfa7f251 h1:X/dY6GJ8PxIDPgqpWO0bZqBoHrBUVA+8x//tO50PQMk=
github.com/conduitio/conduit-connector-protocol v0.6.1-0.20240705154009-b938cfa7f251/go.mod h1:LDGRw1uphxd0MNaF9NbLUbFwoJWS+GehsX4eQYau6f4=
github.com/conduitio/conduit-connector-protocol v0.6.1-0.20240716144613-8ef04cf70e73 h1:WnfXToyuoA6hEI+Elb3sVsm1DURW+7mdc3u7D1iFxiY=
github.com/conduitio/conduit-connector-protocol v0.6.1-0.20240716144613-8ef04cf70e73/go.mod h1:nN3W0Y+IA+CokBbqnNgOZNNc1yW4BJOQm47AsQa5nUM=
github.com/conduitio/conduit-connector-s3 v0.5.2-0.20240630172807-e278fde1fb46 h1:tur/pSyX1RLzkxiBwhsV1qa6wP60pb20hJMptH5RRJY=
github.com/conduitio/conduit-connector-s3 v0.5.2-0.20240630172807-e278fde1fb46/go.mod h1:m+pf2cMF+qCwhMj9gUBV1BPGLPYauhtYkj2zFddfvdE=
github.com/conduitio/conduit-connector-sdk v0.9.2-0.20240705162050-971c5f7facc2 h1:L7RrVxEn7qlAzDKzSO+bu0N371MwVlwx3SOdnCiYSfU=
github.com/conduitio/conduit-connector-sdk v0.9.2-0.20240705162050-971c5f7facc2/go.mod h1:8gTzxxOZ8tRf7XvWxIDviLLcOPa3oXlVueleYYTZzNs=
github.com/conduitio/conduit-connector-sdk v0.9.2-0.20240716152132-20953984628f h1:n9b+5+V0J2XG2xgC1MToG5t9a5CvGaHhsLQTFwBPmLQ=
github.com/conduitio/conduit-connector-sdk v0.9.2-0.20240716152132-20953984628f/go.mod h1:emiX/c7+CFn89lX+4mjycRyh1HPrU4AaVON2KWtP/i4=
github.com/conduitio/conduit-processor-sdk v0.1.2-0.20240516124003-442e4a3f0edd h1:R+tpcZKWOnr6LRsXr85C167SK9MhaLhYUEjBSUupU9Y=
github.com/conduitio/conduit-processor-sdk v0.1.2-0.20240516124003-442e4a3f0edd/go.mod h1:E9zqj0atY1+yBHWi4eZ3TagCZSBnFxBQBUcZktL6RFE=
github.com/conduitio/conduit-schema-registry v0.0.0-20240705193355-7e2064b44e0d h1:C6wRzdyqdQQCL/lCruAsH0j1JoN2GEZBQdDHoxA2B0o=
Expand Down
176 changes: 105 additions & 71 deletions pkg/conduit/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ type Runtime struct {
DB database.DB
Orchestrator *orchestrator.Orchestrator
ProvisionService *provisioning.Service
SchemaRegistry schemaregistry.Registry

// Ready will be closed when Runtime has successfully started
Ready chan struct{}

Expand All @@ -94,9 +96,7 @@ type Runtime struct {
connectorPluginService *conn_plugin.PluginService
processorPluginService *proc_plugin.PluginService

schemaRegistry schemaregistry.Registry
connSchemaService *connutils.SchemaService

schemaService *connutils.SchemaService
connectorPersister *connector.Persister

logger log.CtxLogger
Expand Down Expand Up @@ -154,7 +154,7 @@ func NewRuntime(cfg Config) (*Runtime, error) {
logger: logger,
}

err := initServices(r)
err := createServices(r)
if err != nil {
return nil, cerrors.Errorf("failed to initialize services: %w", err)
}
Expand All @@ -163,7 +163,7 @@ func NewRuntime(cfg Config) (*Runtime, error) {
}

// Create all necessary internal services
func initServices(r *Runtime) error {
func createServices(r *Runtime) error {
standaloneReg, err := proc_standalone.NewRegistry(r.logger, r.Config.Processors.Path)
if err != nil {
return cerrors.Errorf("failed creating processor registry: %w", err)
Expand All @@ -175,24 +175,19 @@ func initServices(r *Runtime) error {
standaloneReg,
)

var schemaRegistry schemaregistry.Registry
switch r.Config.SchemaRegistry.Type {
case SchemaRegistryTypeConfluent:
schemaRegistry, err = schemaregistry.NewClient(r.logger, sr.URLs(r.Config.SchemaRegistry.Confluent.ConnectionString))
if err != nil {
return cerrors.Errorf("failed to create schema registry client: %w", err)
}
case SchemaRegistryTypeBuiltin:
schemaRegistry = conduitschemaregistry.NewSchemaRegistry()
default:
// shouldn't happen, we validate the config
return cerrors.Errorf("invalid schema registry type %q", r.Config.SchemaRegistry.Type)
schemaRegistry, err := createSchemaRegistry(r.Config, r.logger)
if err != nil {
return cerrors.Errorf("failed to create schema registry: %w", err)
}
schemaService := connutils.NewSchemaService(r.logger, schemaRegistry)

connSchemaService := connutils.NewSchemaService(r.logger, schemaRegistry)
connPluginService := conn_plugin.NewPluginService(
r.logger,
conn_builtin.NewRegistry(r.logger, r.Config.ConnectorPlugins, connSchemaService),
conn_builtin.NewRegistry(
r.logger,
r.Config.ConnectorPlugins,
schemaService,
),
conn_standalone.NewRegistry(r.logger, r.Config.Connectors.Path),
)

Expand All @@ -206,17 +201,38 @@ func initServices(r *Runtime) error {

r.Orchestrator = orc
r.ProvisionService = provisionService
r.SchemaRegistry = schemaRegistry

r.pipelineService = plService
r.connectorService = connService
r.processorService = procService
r.connectorPluginService = connPluginService
r.processorPluginService = procPluginService
r.schemaRegistry = schemaRegistry
r.connSchemaService = connSchemaService
r.schemaService = schemaService

return nil
}

func createSchemaRegistry(config Config, logger log.CtxLogger) (schemaregistry.Registry, error) {
var schemaRegistry schemaregistry.Registry
var err error

switch config.SchemaRegistry.Type {
case SchemaRegistryTypeConfluent:
schemaRegistry, err = schemaregistry.NewClient(logger, sr.URLs(config.SchemaRegistry.Confluent.ConnectionString))
if err != nil {
return nil, cerrors.Errorf("failed to create schema registry client: %w", err)
}
case SchemaRegistryTypeBuiltin:
schemaRegistry = conduitschemaregistry.NewSchemaRegistry()
default:
// shouldn't happen, we validate the config
return nil, cerrors.Errorf("invalid schema registry type %q", config.SchemaRegistry.Type)
}

return schemaRegistry, nil
}

func newGRPCStatsHandler() *promgrpc.StatsHandler {
h := promgrpc.ServerStatsHandler()
promclient.MustRegister(h)
Expand Down Expand Up @@ -270,55 +286,10 @@ func (r *Runtime) Run(ctx context.Context) (err error) {
// Register cleanup function that will run after tomb is killed
r.registerCleanup(t)

// Init each service
err = r.processorService.Init(ctx)
// Initialize all services
err = r.initServices(ctx, t)
if err != nil {
return cerrors.Errorf("failed to init processor service: %w", err)
}
err = r.connectorService.Init(ctx)
if err != nil {
return cerrors.Errorf("failed to init connector service: %w", err)
}

if r.Config.Pipelines.ExitOnError {
r.pipelineService.OnFailure(func(e pipeline.FailureEvent) {
r.logger.Warn(ctx).
Err(e.Error).
Str(log.PipelineIDField, e.ID).
Msg("Conduit will shut down due to a pipeline failure and 'exit on error' enabled")
t.Kill(cerrors.Errorf("shut down due to 'exit on error' enabled: %w", e.Error))
})
}
err = r.pipelineService.Init(ctx)
if err != nil {
return cerrors.Errorf("failed to init pipeline service: %w", err)
}

err = r.ProvisionService.Init(ctx)
if err != nil {
cerrors.ForEach(err, func(err error) {
r.logger.Err(ctx, err).Msg("provisioning failed")
})
if r.Config.Pipelines.ExitOnError {
r.logger.Warn(ctx).
Err(err).
Msg("Conduit will shut down due to a pipeline provisioning failure and 'exit on error' enabled")
err = cerrors.Errorf("shut down due to 'exit on error' enabled: %w", err)
return err
}
}

err = r.pipelineService.Run(ctx, r.connectorService, r.processorService, r.connectorPluginService)
if err != nil {
cerrors.ForEach(err, func(err error) {
r.logger.Err(ctx, err).Msg("pipeline failed to be started")
})
}

// APIs needed by connector plugins
_, err = r.startConnectorUtils(ctx, t)
if err != nil {
return cerrors.Errorf("failed to start connector utilities: %w", err)
return cerrors.Errorf("failed to initialize services: %w", err)
}

// Public gRPC and HTTP API
Expand Down Expand Up @@ -504,7 +475,7 @@ func (r *Runtime) startConnectorUtils(ctx context.Context, t *tomb.Tomb) (net.Ad
grpc.StatsHandler(r.gRPCStatsHandler),
)

schemaServiceAPI := pconduitserver.NewSchemaServiceServer(r.connSchemaService)
schemaServiceAPI := pconduitserver.NewSchemaServiceServer(r.schemaService)
conduitv1.RegisterSchemaServiceServer(grpcServer, schemaServiceAPI)

// Makes it easier to use command line tools to interact
Expand All @@ -515,7 +486,7 @@ func (r *Runtime) startConnectorUtils(ctx context.Context, t *tomb.Tomb) (net.Ad
// Names taken from schema.proto
healthServer := api.NewHealthServer(
map[string]api.Checker{
"SchemaService": r.connSchemaService,
"SchemaService": r.schemaService,
},
r.logger,
)
Expand Down Expand Up @@ -747,3 +718,66 @@ func (r *Runtime) serveHTTP(

return ln.Addr(), nil
}

func (r *Runtime) initServices(ctx context.Context, t *tomb.Tomb) error {
err := r.processorService.Init(ctx)
if err != nil {
return cerrors.Errorf("failed to init processor service: %w", err)
}

token := r.schemaService.Token()

// Initialize APIs needed by connector plugins
// Needs to be initialized before connectorPluginService
// because the standalone connector registry needs to run all plugins,
// and the plugins initialize a connector utils client when they are run.
connUtilsAddr, err := r.startConnectorUtils(ctx, t)
if err != nil {
return cerrors.Errorf("failed to start connector utilities API: %w", err)
}
r.logger.Info(ctx).Msgf("connector utilities started on %v", connUtilsAddr)

r.connectorPluginService.Init(ctx, connUtilsAddr.String(), token)

err = r.connectorService.Init(ctx)
if err != nil {
return cerrors.Errorf("failed to init connector service: %w", err)
}

if r.Config.Pipelines.ExitOnError {
r.pipelineService.OnFailure(func(e pipeline.FailureEvent) {
r.logger.Warn(ctx).
Err(e.Error).
Str(log.PipelineIDField, e.ID).
Msg("Conduit will shut down due to a pipeline failure and 'exit on error' enabled")
t.Kill(cerrors.Errorf("shut down due to 'exit on error' enabled: %w", e.Error))
})
}
err = r.pipelineService.Init(ctx)
if err != nil {
return cerrors.Errorf("failed to init pipeline service: %w", err)
}

err = r.ProvisionService.Init(ctx)
if err != nil {
cerrors.ForEach(err, func(err error) {
r.logger.Err(ctx, err).Msg("provisioning failed")
})
if r.Config.Pipelines.ExitOnError {
r.logger.Warn(ctx).
Err(err).
Msg("Conduit will shut down due to a pipeline provisioning failure and 'exit on error' enabled")
err = cerrors.Errorf("shut down due to 'exit on error' enabled: %w", err)
return err
}
}

err = r.pipelineService.Run(ctx, r.connectorService, r.processorService, r.connectorPluginService)
if err != nil {
cerrors.ForEach(err, func(err error) {
r.logger.Err(ctx, err).Msg("pipeline failed to be started")
})
}

return nil
}
4 changes: 2 additions & 2 deletions pkg/connector/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ type Connector interface {

// PluginDispenserFetcher can fetch a plugin dispenser.
type PluginDispenserFetcher interface {
NewDispenser(logger log.CtxLogger, name string) (connectorPlugin.Dispenser, error)
NewDispenser(logger log.CtxLogger, name string, connectorID string) (connectorPlugin.Dispenser, error)
}

func (i *Instance) Init(logger log.CtxLogger, persister *Persister) {
Expand Down Expand Up @@ -120,7 +120,7 @@ func (i *Instance) Connector(_ context.Context, dispenserFetcher PluginDispenser
return nil, ErrConnectorRunning
}

pluginDispenser, err := dispenserFetcher.NewDispenser(i.logger, i.Plugin)
pluginDispenser, err := dispenserFetcher.NewDispenser(i.logger, i.Plugin, i.ID)
if err != nil {
return nil, cerrors.Errorf("failed to get plugin dispenser: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/connector/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
// fakePluginFetcher fulfills the PluginFetcher interface.
type fakePluginFetcher map[string]connectorPlugin.Dispenser

func (fpf fakePluginFetcher) NewDispenser(_ log.CtxLogger, name string) (connectorPlugin.Dispenser, error) {
func (fpf fakePluginFetcher) NewDispenser(_ log.CtxLogger, name string, _ string) (connectorPlugin.Dispenser, error) {
plug, ok := fpf[name]
if !ok {
return nil, plugin.ErrPluginNotFound
Expand Down
12 changes: 6 additions & 6 deletions pkg/orchestrator/mock/orchestrator.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ type ProcessorService interface {

type ConnectorPluginService interface {
List(ctx context.Context) (map[string]pconnector.Specification, error)
NewDispenser(logger log.CtxLogger, name string) (connectorPlugin.Dispenser, error)
NewDispenser(logger log.CtxLogger, name string, connectorID string) (connectorPlugin.Dispenser, error)
ValidateSourceConfig(ctx context.Context, name string, settings map[string]string) error
ValidateDestinationConfig(ctx context.Context, name string, settings map[string]string) error
}
Expand Down
1 change: 1 addition & 0 deletions pkg/orchestrator/orchestrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func TestPipelineSimple(t *testing.T) {
conn_builtin.NewRegistry(logger, conn_builtin.DefaultBuiltinConnectors, schemaService),
conn_standalone.NewRegistry(logger, ""),
)
connPluginService.Init(ctx, "conn-utils-token:12345", "conn-utils-token")

procPluginService := proc_plugin.NewPluginService(
logger,
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type ProcessorService interface {

// PluginDispenserFetcher can fetch a plugin.
type PluginDispenserFetcher interface {
NewDispenser(logger log.CtxLogger, name string) (connectorPlugin.Dispenser, error)
NewDispenser(logger log.CtxLogger, name string, connectorID string) (connectorPlugin.Dispenser, error)
}

// Run runs pipelines that had the running state in store.
Expand Down
Loading

0 comments on commit 203aeb7

Please sign in to comment.