Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update processor SDK, add middleware #1742

Merged
merged 8 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ require (
github.com/conduitio/conduit-connector-protocol v0.6.1-0.20240802103310-fd4ab945b1ac
github.com/conduitio/conduit-connector-s3 v0.5.2-0.20240802092651-67dc543a6c90
github.com/conduitio/conduit-connector-sdk v0.9.2-0.20240802133134-6635ddc2aff6
github.com/conduitio/conduit-processor-sdk v0.1.2-0.20240731185647-3b6f7d3b4b71
github.com/conduitio/conduit-processor-sdk v0.1.2-0.20240803121812-e641b45ecd3f
github.com/conduitio/conduit-schema-registry v0.0.0-20240725155046-a0fdb61d2588
github.com/conduitio/yaml/v3 v3.3.0
github.com/dop251/goja v0.0.0-20231027120936-b396bb4c349d
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,8 @@ github.com/conduitio/conduit-connector-s3 v0.5.2-0.20240802092651-67dc543a6c90 h
github.com/conduitio/conduit-connector-s3 v0.5.2-0.20240802092651-67dc543a6c90/go.mod h1:Cg0rM0NJdIO+CQZWyXtikUVxVainUUq0MqQr8sWnO8E=
github.com/conduitio/conduit-connector-sdk v0.9.2-0.20240802133134-6635ddc2aff6 h1:yX2SddKRmb1gMi94umalIKF+8+hipknGAEFNXz0+B+E=
github.com/conduitio/conduit-connector-sdk v0.9.2-0.20240802133134-6635ddc2aff6/go.mod h1:R2V+ZXCFIeIIv8xxZHsi6NGntgsimsEbhQg4ezhna+0=
github.com/conduitio/conduit-processor-sdk v0.1.2-0.20240731185647-3b6f7d3b4b71 h1:TCHq3L/LS9Ngo2a4h39vSNpXXisTox1l5uXdwPxxtNM=
github.com/conduitio/conduit-processor-sdk v0.1.2-0.20240731185647-3b6f7d3b4b71/go.mod h1:n6VqVO07olTlvIUSHf2kZcU8cgu2jGmuO6bFrQST2v8=
github.com/conduitio/conduit-processor-sdk v0.1.2-0.20240803121812-e641b45ecd3f h1:nL2ah3mSPnDVV0sLhSVJQXR+1xsrQdAZ6Je8GB+Ssd4=
github.com/conduitio/conduit-processor-sdk v0.1.2-0.20240803121812-e641b45ecd3f/go.mod h1:kQ+7bUREM+F2L/yPPM/GF8z2lqqmpx8Su+ioyE1uO8Q=
github.com/conduitio/conduit-schema-registry v0.0.0-20240725155046-a0fdb61d2588 h1:/OBjxI1JjE3AmifouogZ2KvlhGJ9tQGk4X7UxwjHo1o=
github.com/conduitio/conduit-schema-registry v0.0.0-20240725155046-a0fdb61d2588/go.mod h1:G5t9W5Z5Mn0nW1TNnIQ1al4piqRXjc1R7HjdHgGFCx4=
github.com/conduitio/yaml/v3 v3.3.0 h1:kbbaOSHcuH39gP4+rgbJGl6DSbLZcJgEaBvkEXJlCsI=
Expand Down
2 changes: 1 addition & 1 deletion pkg/conduit/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func createServices(r *Runtime) error {

procPluginService := proc_plugin.NewPluginService(
r.logger,
proc_builtin.NewRegistry(r.logger, proc_builtin.DefaultBuiltinProcessors),
proc_builtin.NewRegistry(r.logger, proc_builtin.DefaultBuiltinProcessors, procSchemaService),
standaloneReg,
)

Expand Down
9 changes: 6 additions & 3 deletions pkg/orchestrator/orchestrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
conn_standalone "github.com/conduitio/conduit/pkg/plugin/connector/standalone"
proc_plugin "github.com/conduitio/conduit/pkg/plugin/processor"
proc_builtin "github.com/conduitio/conduit/pkg/plugin/processor/builtin"
"github.com/conduitio/conduit/pkg/plugin/processor/procutils"
"github.com/conduitio/conduit/pkg/processor"
"github.com/google/go-cmp/cmp"
"github.com/matryer/is"
Expand Down Expand Up @@ -74,19 +75,21 @@ func TestPipelineSimple(t *testing.T) {
schemaRegistry, err := schemaregistry.NewSchemaRegistry(db)
is.NoErr(err)
authManager := connutils.NewAuthManager()
schemaService := connutils.NewSchemaService(logger, schemaRegistry, authManager)
connSchemaService := connutils.NewSchemaService(logger, schemaRegistry, authManager)

connPluginService := conn_plugin.NewPluginService(
logger,
conn_builtin.NewRegistry(logger, conn_builtin.DefaultBuiltinConnectors, schemaService),
conn_builtin.NewRegistry(logger, conn_builtin.DefaultBuiltinConnectors, connSchemaService),
conn_standalone.NewRegistry(logger, ""),
authManager,
)
connPluginService.Init(ctx, "conn-utils-token:12345")

procSchemaService := procutils.NewSchemaService(logger, schemaRegistry)

procPluginService := proc_plugin.NewPluginService(
logger,
proc_builtin.NewRegistry(logger, proc_builtin.DefaultBuiltinProcessors),
proc_builtin.NewRegistry(logger, proc_builtin.DefaultBuiltinProcessors, procSchemaService),
nil,
)

Expand Down
25 changes: 13 additions & 12 deletions pkg/plugin/processor/builtin/impl/avro/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"testing"

"github.com/conduitio/conduit-commons/config"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
Expand All @@ -27,13 +28,13 @@ import (
func TestConfig_Parse(t *testing.T) {
testCases := []struct {
name string
input map[string]string
input config.Config
want encodeConfig
wantErr error
}{
{
name: "preRegistered",
input: map[string]string{
input: config.Config{
"url": "http://localhost",
"schema.strategy": "preRegistered",
"schema.preRegistered.subject": "testsubject",
Expand All @@ -53,7 +54,7 @@ func TestConfig_Parse(t *testing.T) {
},
{
name: "preRegistered without version",
input: map[string]string{
input: config.Config{
"url": "http://localhost",
"schema.strategy": "preRegistered",
"schema.preRegistered.subject": "testsubject",
Expand All @@ -62,7 +63,7 @@ func TestConfig_Parse(t *testing.T) {
},
{
name: "preRegistered without subject",
input: map[string]string{
input: config.Config{
"url": "http://localhost",
"schema.strategy": "preRegistered",
"schema.preRegistered.version": "123",
Expand All @@ -71,7 +72,7 @@ func TestConfig_Parse(t *testing.T) {
},
{
name: "autoRegister",
input: map[string]string{
input: config.Config{
"url": "http://localhost",
"schema.strategy": "autoRegister",
"schema.autoRegister.subject": "testsubject",
Expand All @@ -87,15 +88,15 @@ func TestConfig_Parse(t *testing.T) {
},
{
name: "autoRegister without subject",
input: map[string]string{
input: config.Config{
"url": "http://localhost",
"schema.strategy": "autoRegister",
},
wantErr: cerrors.New("failed parsing schema strategy: subject required for schema strategy 'autoRegister'"),
},
{
name: "non-default target field",
input: map[string]string{
input: config.Config{
"url": "http://localhost",
"schema.strategy": "autoRegister",
"schema.autoRegister.subject": "testsubject",
Expand All @@ -112,7 +113,7 @@ func TestConfig_Parse(t *testing.T) {
},
{
name: "valid auth",
input: map[string]string{
input: config.Config{
"url": "http://localhost",
"schema.strategy": "autoRegister",
"schema.autoRegister.subject": "testsubject",
Expand All @@ -134,7 +135,7 @@ func TestConfig_Parse(t *testing.T) {
},
{
name: "auth -- no username",
input: map[string]string{
input: config.Config{
"url": "http://localhost",
"schema.strategy": "autoRegister",
"schema.autoRegister.subject": "testsubject",
Expand All @@ -144,7 +145,7 @@ func TestConfig_Parse(t *testing.T) {
},
{
name: "auth -- no password",
input: map[string]string{
input: config.Config{
"url": "http://localhost",
"schema.strategy": "autoRegister",
"schema.autoRegister.subject": "testsubject",
Expand All @@ -154,7 +155,7 @@ func TestConfig_Parse(t *testing.T) {
},
{
name: "tls: missing client cert and key",
input: map[string]string{
input: config.Config{
"url": "http://localhost",
"schema.strategy": "autoRegister",
"schema.autoRegister.subject": "testsubject",
Expand All @@ -166,7 +167,7 @@ missing field: tls.client.key`),
},
{
name: "valid tls",
input: map[string]string{
input: config.Config{
"url": "http://localhost",
"schema.strategy": "autoRegister",
"schema.autoRegister.subject": "testsubject",
Expand Down
3 changes: 2 additions & 1 deletion pkg/plugin/processor/builtin/impl/avro/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"crypto/tls"

"github.com/conduitio/conduit-commons/config"
"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-processor-sdk"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
Expand Down Expand Up @@ -126,7 +127,7 @@ This processor is the counterpart to [` + "`avro.encode`" + `](/docs/processors/
}, nil
}

func (p *decodeProcessor) Configure(ctx context.Context, m map[string]string) error {
func (p *decodeProcessor) Configure(ctx context.Context, m config.Config) error {
lovromazgon marked this conversation as resolved.
Show resolved Hide resolved
cfg, err := parseDecodeConfig(ctx, m)
if err != nil {
return cerrors.Errorf("invalid config: %w", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"

"github.com/conduitio/conduit-commons/config"
"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-processor-sdk"
"github.com/conduitio/conduit/pkg/foundation/log"
Expand Down Expand Up @@ -74,7 +75,7 @@ In this example we use the following schema:
]
}
` + "```",
Config: map[string]string{
Config: config.Config{
"url": url,
"field": ".Key",
},
Expand Down
5 changes: 3 additions & 2 deletions pkg/plugin/processor/builtin/impl/avro/decode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"testing"

"github.com/conduitio/conduit-commons/config"
"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-processor-sdk"
"github.com/conduitio/conduit/pkg/foundation/log"
Expand Down Expand Up @@ -52,7 +53,7 @@ func TestDecodeProcessor_Process_RawData_CustomField(t *testing.T) {
is := is.New(t)
ctx := context.Background()

config := map[string]string{
cfg := config.Config{
"url": "http://localhost",
"field": ".Payload.Before.something",
}
Expand All @@ -70,7 +71,7 @@ func TestDecodeProcessor_Process_RawData_CustomField(t *testing.T) {
want.Payload.Before.(opencdc.StructuredData)["something"] = decodedVal

underTest := NewDecodeProcessor(log.Nop())
err := underTest.Configure(ctx, config)
err := underTest.Configure(ctx, cfg)
is.NoErr(err)

// skipping Open(), so we can inject a mock encoder
Expand Down
5 changes: 3 additions & 2 deletions pkg/plugin/processor/builtin/impl/avro/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"crypto/tls"

"github.com/conduitio/conduit-commons/config"
"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-processor-sdk"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
Expand Down Expand Up @@ -71,7 +72,7 @@ func (c encodeConfig) ClientOptions() []sr.ClientOpt {
return clientOpts
}

func parseEncodeConfig(ctx context.Context, m map[string]string) (encodeConfig, error) {
func parseEncodeConfig(ctx context.Context, m config.Config) (encodeConfig, error) {
lovromazgon marked this conversation as resolved.
Show resolved Hide resolved
cfg := encodeConfig{}
err := sdk.ParseConfig(ctx, m, &cfg, cfg.Parameters())
if err != nil {
Expand Down Expand Up @@ -146,7 +147,7 @@ This processor is the counterpart to [` + "`avro.decode`" + `](/docs/processors/
}, nil
}

func (p *encodeProcessor) Configure(ctx context.Context, m map[string]string) error {
func (p *encodeProcessor) Configure(ctx context.Context, m config.Config) error {
lovromazgon marked this conversation as resolved.
Show resolved Hide resolved
cfg, err := parseEncodeConfig(ctx, m)
if err != nil {
return cerrors.Errorf("invalid config: %w", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"

"github.com/conduitio/conduit-commons/config"
"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-processor-sdk"
"github.com/conduitio/conduit/pkg/foundation/log"
Expand All @@ -42,7 +43,7 @@ func ExampleEncodeProcessor_autoRegister() {
with the ` + "`autoRegister`" + ` schema strategy. The processor encodes the record's
` + "`.Payload.After`" + ` field using the schema that is extracted from the data
and registered on the fly under the subject ` + "`example-autoRegister`" + `.`,
Config: map[string]string{
Config: config.Config{
"url": url,
"schema.strategy": "autoRegister",
"schema.autoRegister.subject": "example-autoRegister",
Expand Down Expand Up @@ -154,7 +155,7 @@ schema has to be manually pre-registered. In this example we use the following s
` + "```" + `

The processor encodes the record's` + "`.Key`" + ` field using the above schema.`,
Config: map[string]string{
Config: config.Config{
"url": url,
"schema.strategy": "preRegistered",
"schema.preRegistered.subject": "example-preRegistered",
Expand Down
17 changes: 9 additions & 8 deletions pkg/plugin/processor/builtin/impl/avro/encode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"testing"

"github.com/conduitio/conduit-commons/config"
"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-processor-sdk"
"github.com/conduitio/conduit/pkg/foundation/log"
Expand All @@ -31,7 +32,7 @@ func TestEncodeProcessor_Process_StructuredData(t *testing.T) {
is := is.New(t)
ctx := context.Background()

config := map[string]string{
cfg := config.Config{
"url": "http://localhost",
"schema.strategy": "autoRegister",
"schema.autoRegister.subject": "testsubject",
Expand All @@ -51,7 +52,7 @@ func TestEncodeProcessor_Process_StructuredData(t *testing.T) {
want.Payload.After = opencdc.RawData("encoded")

underTest := NewEncodeProcessor(log.Nop())
err := underTest.Configure(ctx, config)
err := underTest.Configure(ctx, cfg)
is.NoErr(err)

// skipping Open(), so we can inject a mock encoder
Expand All @@ -70,7 +71,7 @@ func TestEncodeProcessor_Process_RawData(t *testing.T) {
is := is.New(t)
ctx := context.Background()

config := map[string]string{
cfg := config.Config{
"url": "http://localhost",
"schema.strategy": "autoRegister",
"schema.autoRegister.subject": "testsubject",
Expand All @@ -88,7 +89,7 @@ func TestEncodeProcessor_Process_RawData(t *testing.T) {
want.Payload.After = opencdc.RawData("encoded")

underTest := NewEncodeProcessor(log.Nop())
err := underTest.Configure(ctx, config)
err := underTest.Configure(ctx, cfg)
is.NoErr(err)

// skipping Open(), so we can inject a mock encoder
Expand Down Expand Up @@ -127,7 +128,7 @@ func TestEncodeProcessor_Process_RawData_CustomField(t *testing.T) {
is := is.New(t)
ctx := context.Background()

config := map[string]string{
cfg := config.Config{
"url": "http://localhost",
"field": ".Payload.Before.something",
"schema.strategy": "autoRegister",
Expand All @@ -147,7 +148,7 @@ func TestEncodeProcessor_Process_RawData_CustomField(t *testing.T) {
want.Payload.Before.(opencdc.StructuredData)["something"] = encodedValue

underTest := NewEncodeProcessor(log.Nop())
err := underTest.Configure(ctx, config)
err := underTest.Configure(ctx, cfg)
is.NoErr(err)

// skipping Open(), so we can inject a mock encoder
Expand Down Expand Up @@ -225,7 +226,7 @@ func TestEncodeProcessor_Process_EmptyPayloadField(t *testing.T) {
is := is.New(t)
ctx := context.Background()

config := map[string]string{
cfg := config.Config{
"url": "http://localhost",
"field": tc.field,
"schema.strategy": "autoRegister",
Expand All @@ -237,7 +238,7 @@ func TestEncodeProcessor_Process_EmptyPayloadField(t *testing.T) {
want.Payload.Before = tc.wantPayloadBefore

underTest := NewEncodeProcessor(log.Nop())
err := underTest.Configure(ctx, config)
err := underTest.Configure(ctx, cfg)
is.NoErr(err)

// skipping Open(), so we can inject a mock encoder
Expand Down
3 changes: 2 additions & 1 deletion pkg/plugin/processor/builtin/impl/base64/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"encoding/base64"

"github.com/conduitio/conduit-commons/config"
"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-processor-sdk"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
Expand Down Expand Up @@ -57,7 +58,7 @@ result in the target field. It is not allowed to decode the ` + "`.Position`" +
}, nil
}

func (p *decodeProcessor) Configure(ctx context.Context, m map[string]string) error {
func (p *decodeProcessor) Configure(ctx context.Context, m config.Config) error {
err := sdk.ParseConfig(ctx, m, &p.config, p.config.Parameters())
if err != nil {
return cerrors.Errorf("failed to parse configuration: %w", err)
Expand Down
Loading