Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Schemas] Use configured schema service in Avro processors #1743

Merged
merged 14 commits into from
Aug 6, 2024
2 changes: 1 addition & 1 deletion pkg/conduit/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func createServices(r *Runtime) error {

procPluginService := proc_plugin.NewPluginService(
r.logger,
proc_builtin.NewRegistry(r.logger, proc_builtin.DefaultBuiltinProcessors, procSchemaService),
proc_builtin.NewRegistry(r.logger, proc_builtin.DefaultBuiltinProcessors, schemaRegistry),
standaloneReg,
)

Expand Down
5 changes: 1 addition & 4 deletions pkg/orchestrator/orchestrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
conn_standalone "github.com/conduitio/conduit/pkg/plugin/connector/standalone"
proc_plugin "github.com/conduitio/conduit/pkg/plugin/processor"
proc_builtin "github.com/conduitio/conduit/pkg/plugin/processor/builtin"
"github.com/conduitio/conduit/pkg/plugin/processor/procutils"
"github.com/conduitio/conduit/pkg/processor"
"github.com/google/go-cmp/cmp"
"github.com/matryer/is"
Expand Down Expand Up @@ -85,11 +84,9 @@ func TestPipelineSimple(t *testing.T) {
)
connPluginService.Init(ctx, "conn-utils-token:12345")

procSchemaService := procutils.NewSchemaService(logger, schemaRegistry)

procPluginService := proc_plugin.NewPluginService(
logger,
proc_builtin.NewRegistry(logger, proc_builtin.DefaultBuiltinProcessors, procSchemaService),
proc_builtin.NewRegistry(logger, proc_builtin.DefaultBuiltinProcessors, schemaRegistry),
nil,
)

Expand Down
92 changes: 0 additions & 92 deletions pkg/plugin/processor/builtin/impl/avro/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,6 @@
package avro

import (
"crypto/tls"
"crypto/x509"
"fmt"
"os"

"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/plugin/processor/builtin/impl/avro/internal"
)
Expand Down Expand Up @@ -88,90 +83,3 @@ func (c *schemaConfig) parseAutoRegister() error {
}
return nil
}

type authConfig struct {
// The username to use with basic authentication. This option is required if
// auth.basic.password contains a value. If both auth.basic.username and auth.basic.password
// are empty basic authentication is disabled.
Username string `json:"basic.username"`
// The password to use with basic authentication. This option is required if
// auth.basic.username contains a value. If both auth.basic.username and auth.basic.password
// are empty basic authentication is disabled.
Password string `json:"basic.password"`
}

func (c *authConfig) validate() error {
switch {
case c.Username == "" && c.Password == "":
// no basic auth set
return nil
case c.Username == "":
return cerrors.Errorf("specify a username to enable basic auth or remove field password")
case c.Password == "":
return cerrors.Errorf("specify a password to enable basic auth or remove field username")
}

return nil
}

type clientCert struct {
// The path to a file containing a PEM encoded certificate. This option is required
// if tls.client.key contains a value. If both tls.client.cert and tls.client.key are empty
// TLS is disabled.
Cert string `json:"cert"`
// The path to a file containing a PEM encoded private key. This option is required
// if tls.client.cert contains a value. If both tls.client.cert and tls.client.key are empty
// TLS is disabled.
Key string `json:"key"`
}

type tlsConfig struct {
// The path to a file containing PEM encoded CA certificates. If this option is empty,
// Conduit falls back to using the host's root CA set.
CACert string `json:"ca.cert"`

Client clientCert `json:"client"`

tlsClientCert *tls.Certificate
tlsCACert *x509.CertPool
}

func (c *tlsConfig) parse() error {
if c.Client.Cert == "" && c.Client.Key == "" && c.CACert == "" {
// no tls config set
return nil
} else if c.Client.Cert == "" || c.Client.Key == "" {
// we are missing some configuration fields
errs := []error{cerrors.New("invalid TLS config")}
if c.Client.Cert == "" {
errs = append(errs, cerrors.New("missing field: tls.client.cert"))
}
if c.Client.Key == "" {
errs = append(errs, cerrors.New("missing field: tls.client.key"))
}
// CA cert is optional, we don't check if it's missing
return cerrors.Join(errs...)
}

clientCert, err := tls.LoadX509KeyPair(c.Client.Cert, c.Client.Key)
if err != nil {
return fmt.Errorf("failed to load client certificate: %w", err)
}

c.tlsClientCert = &clientCert

if c.CACert != "" {
// load custom CA cert
caCert, err := os.ReadFile(c.CACert)
if err != nil {
return fmt.Errorf("failed to load CA certificate: %w", err)
}
caCertPool := x509.NewCertPool()
if ok := caCertPool.AppendCertsFromPEM(caCert); !ok {
return cerrors.New("invalid CA cert")
}
c.tlsCACert = caCertPool
}

return nil
}
91 changes: 1 addition & 90 deletions pkg/plugin/processor/builtin/impl/avro/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,11 @@ func TestConfig_Parse(t *testing.T) {
{
name: "preRegistered",
input: config.Config{
"url": "http://localhost",
"schema.strategy": "preRegistered",
"schema.preRegistered.subject": "testsubject",
"schema.preRegistered.version": "123",
},
want: encodeConfig{
URL: "http://localhost",
Field: ".Payload.After",
Schema: schemaConfig{
StrategyType: "preRegistered",
Expand All @@ -55,7 +53,6 @@ func TestConfig_Parse(t *testing.T) {
{
name: "preRegistered without version",
input: config.Config{
"url": "http://localhost",
"schema.strategy": "preRegistered",
"schema.preRegistered.subject": "testsubject",
},
Expand All @@ -64,7 +61,6 @@ func TestConfig_Parse(t *testing.T) {
{
name: "preRegistered without subject",
input: config.Config{
"url": "http://localhost",
"schema.strategy": "preRegistered",
"schema.preRegistered.version": "123",
},
Expand All @@ -73,12 +69,10 @@ func TestConfig_Parse(t *testing.T) {
{
name: "autoRegister",
input: config.Config{
"url": "http://localhost",
"schema.strategy": "autoRegister",
"schema.autoRegister.subject": "testsubject",
},
want: encodeConfig{
URL: "http://localhost",
Field: ".Payload.After",
Schema: schemaConfig{
StrategyType: "autoRegister",
Expand All @@ -89,111 +83,28 @@ func TestConfig_Parse(t *testing.T) {
{
name: "autoRegister without subject",
input: config.Config{
"url": "http://localhost",
"schema.strategy": "autoRegister",
},
wantErr: cerrors.New("failed parsing schema strategy: subject required for schema strategy 'autoRegister'"),
},
{
name: "non-default target field",
input: config.Config{
"url": "http://localhost",
"schema.strategy": "autoRegister",
"schema.autoRegister.subject": "testsubject",
"field": ".Payload.After.something",
},
want: encodeConfig{
Field: ".Payload.After.something",
URL: "http://localhost",
Schema: schemaConfig{
StrategyType: "autoRegister",
AutoRegisteredSubject: "testsubject",
},
},
},
{
name: "valid auth",
input: config.Config{
"url": "http://localhost",
"schema.strategy": "autoRegister",
"schema.autoRegister.subject": "testsubject",
"auth.basic.username": "[email protected]",
"auth.basic.password": "Passw0rd",
},
want: encodeConfig{
URL: "http://localhost",
Field: ".Payload.After",
Schema: schemaConfig{
StrategyType: "autoRegister",
AutoRegisteredSubject: "testsubject",
},
Auth: authConfig{
Username: "[email protected]",
Password: "Passw0rd",
},
},
},
{
name: "auth -- no username",
input: config.Config{
"url": "http://localhost",
"schema.strategy": "autoRegister",
"schema.autoRegister.subject": "testsubject",
"auth.basic.password": "Passw0rd",
},
wantErr: cerrors.New("invalid basic auth: specify a username to enable basic auth or remove field password"),
},
{
name: "auth -- no password",
input: config.Config{
"url": "http://localhost",
"schema.strategy": "autoRegister",
"schema.autoRegister.subject": "testsubject",
"auth.basic.username": "[email protected]",
},
wantErr: cerrors.New("invalid basic auth: specify a password to enable basic auth or remove field username"),
},
{
name: "tls: missing client cert and key",
input: config.Config{
"url": "http://localhost",
"schema.strategy": "autoRegister",
"schema.autoRegister.subject": "testsubject",
"tls.ca.cert": "/tmp/something",
},
wantErr: cerrors.New(`failed parsing TLS: invalid TLS config
missing field: tls.client.cert
missing field: tls.client.key`),
},
{
name: "valid tls",
input: config.Config{
"url": "http://localhost",
"schema.strategy": "autoRegister",
"schema.autoRegister.subject": "testsubject",
"tls.ca.cert": "testdata/cert.pem",
"tls.client.cert": "testdata/ca.pem",
"tls.client.key": "testdata/ca-key.pem",
},
want: encodeConfig{
Field: ".Payload.After",
URL: "http://localhost",
Schema: schemaConfig{
StrategyType: "autoRegister",
AutoRegisteredSubject: "testsubject",
},
TLS: tlsConfig{
CACert: "testdata/cert.pem",
Client: clientCert{
Cert: "testdata/ca.pem",
Key: "testdata/ca-key.pem",
},
},
},
},
}

cmpOpts := cmpopts.IgnoreUnexported(encodeConfig{}, schemaConfig{}, tlsConfig{})
cmpOpts := cmpopts.IgnoreUnexported(encodeConfig{}, schemaConfig{})
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
is := is.New(t)
Expand Down
Loading