Skip to content

Commit

Permalink
Builtin processors refactoring, housekeeping (#1402)
Browse files Browse the repository at this point in the history
* refactor builtin processors, generate specs into separate files

* move processors into folder impl

* go mod tidy

* add example summaries

* add example summaries

* fix name of avro.decode processor

* add example summaries

* regenerate specs

* update default registry

* update default registry: fix tests, linter, regenerate

* dep upgrades

* refactorings

* move json encode processor to impl/json
* rename field.subset.exclude to field.exclude
* rename constructor for unwrap.debezium processor
* regenerate specs

* dep downgrades

---------

Co-authored-by: Haris Osmanagic <[email protected]>
  • Loading branch information
lovromazgon and hariso authored Mar 7, 2024
1 parent 95e1a55 commit d2c1b77
Show file tree
Hide file tree
Showing 129 changed files with 2,888 additions and 2,307 deletions.
14 changes: 7 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/Masterminds/sprig/v3 v3.2.3
github.com/NYTimes/gziphandler v1.1.1
github.com/bufbuild/buf v1.29.0
github.com/conduitio/conduit-commons v0.0.0-20240229152559-fc28d3c4fc2a
github.com/conduitio/conduit-commons v0.0.1
github.com/conduitio/conduit-connector-file v0.6.0
github.com/conduitio/conduit-connector-generator v0.5.0
github.com/conduitio/conduit-connector-kafka v0.7.1
Expand Down Expand Up @@ -49,7 +49,7 @@ require (
go.uber.org/goleak v1.3.0
go.uber.org/mock v0.4.0
golang.org/x/exp v0.0.0-20240119083558-1b970713d09a
golang.org/x/tools v0.18.0
golang.org/x/tools v0.19.0
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028
google.golang.org/genproto/googleapis/api v0.0.0-20240125205218-1f4bbc51befe
google.golang.org/grpc v1.61.0
Expand Down Expand Up @@ -325,13 +325,13 @@ require (
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.26.0 // indirect
golang.org/x/crypto v0.19.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/exp/typeparams v0.0.0-20231219180239-dc181d75b848 // indirect
golang.org/x/mod v0.15.0 // indirect
golang.org/x/net v0.21.0 // indirect
golang.org/x/mod v0.16.0 // indirect
golang.org/x/net v0.22.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.17.0 // indirect
golang.org/x/term v0.17.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/term v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.5.0 // indirect
google.golang.org/genproto v0.0.0-20240116215550-a9fa1716bcac // indirect
Expand Down
28 changes: 14 additions & 14 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1080,8 +1080,8 @@ github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4/go.mod h1:eXthEFrGJvWH
github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I=
github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ=
github.com/colinmarc/hdfs/v2 v2.1.1/go.mod h1:M3x+k8UKKmxtFu++uAZ0OtDU8jR3jnaZIAc6yK4Ue0c=
github.com/conduitio/conduit-commons v0.0.0-20240229152559-fc28d3c4fc2a h1:ws7NuQWtrgzJeroWcU+gVFJk80khMnIq6Uam/vz1FAw=
github.com/conduitio/conduit-commons v0.0.0-20240229152559-fc28d3c4fc2a/go.mod h1:Hhr5nik71/Wz3yrLjaKSZ2HRQp1wNSOPY2JGGKz7fsw=
github.com/conduitio/conduit-commons v0.0.1 h1:UjY/dsfr88N0l8SdJ3hBOYq38qYmV6DEe5taxvdYjds=
github.com/conduitio/conduit-commons v0.0.1/go.mod h1:WjmUZpazjAolYz8SB++xvJlN47ro3AStDYhwRCK/yRc=
github.com/conduitio/conduit-connector-file v0.6.0 h1:8tsGeGhKvFwYQZztOOL5/tmOhVShsfo9lQ3b/0fX8kQ=
github.com/conduitio/conduit-connector-file v0.6.0/go.mod h1:ju7PiB4kTJgqng4KVXDt/Gvw/53kFwSzi5Ez9EDXxNI=
github.com/conduitio/conduit-connector-generator v0.5.0 h1:zpXHif89DCJ13nftKLv31uI2AJGicpY5H1V7SwldRNo=
Expand Down Expand Up @@ -2152,8 +2152,8 @@ golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0
golang.org/x/crypto v0.10.0/go.mod h1:o4eNf7Ede1fv+hwOwZsTHl9EsPFO6q6ZvYR8vYfY45I=
golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo=
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
Expand Down Expand Up @@ -2221,8 +2221,8 @@ golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.9.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.10.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.11.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.15.0 h1:SernR4v+D55NyBH2QiEQrlBAnj1ECL6AGrA5+dPaMY8=
golang.org/x/mod v0.15.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/mod v0.16.0 h1:QX4fJ0Rr5cPQCF7O9lh9Se4pmwfwskqZfq5moyldzic=
golang.org/x/mod v0.16.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
Expand Down Expand Up @@ -2297,8 +2297,8 @@ golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.11.0/go.mod h1:2L/ixqYpgIVXmeoSA/4Lu7BzTG4KIyPIryS4IsOd1oQ=
golang.org/x/net v0.12.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc=
golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand Down Expand Up @@ -2465,8 +2465,8 @@ golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
Expand All @@ -2481,8 +2481,8 @@ golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
golang.org/x/term v0.9.0/go.mod h1:M6DEAAIenWoTxdKrOltXcmDY3rSplQUkrvaDU5FcQyo=
golang.org/x/term v0.10.0/go.mod h1:lpqdcUyK/oCiQxvxVrppt5ggO2KCZ5QblwqPnfZ6d5o=
golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U=
golang.org/x/term v0.17.0 h1:mkTF7LCd6WGJNL3K1Ad7kwxNfYAW6a8a8QqtMblp/4U=
golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk=
golang.org/x/term v0.18.0 h1:FcHjZXDMxI8mM3nwhX9HlKop4C0YQvCVCdwYl2wOtE8=
golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down Expand Up @@ -2604,8 +2604,8 @@ golang.org/x/tools v0.7.0/go.mod h1:4pg6aUX35JBAogB10C9AtvVL+qowtN4pT3CGSQex14s=
golang.org/x/tools v0.8.0/go.mod h1:JxBZ99ISMI5ViVkT1tr6tdNmXeTrcpVSD3vZ1RsRdN4=
golang.org/x/tools v0.9.1/go.mod h1:owI94Op576fPu3cIGQeHs3joujW/2Oc6MtlxbF5dfNc=
golang.org/x/tools v0.10.0/go.mod h1:UJwyiVBsOA2uwvK/e5OY3GTpDUJriEd+/YlqAwLPmyM=
golang.org/x/tools v0.18.0 h1:k8NLag8AGHnn+PHbl7g43CtqZAwG60vZkLqgyZgIHgQ=
golang.org/x/tools v0.18.0/go.mod h1:GL7B4CwcLLeo59yx/9UWWuNOW1n3VZ4f5axWfML7Lcg=
golang.org/x/tools v0.19.0 h1:tfGCXNR1OsFG+sVdLAitlpjAvD/I6dHDKnYrpEZUHkw=
golang.org/x/tools v0.19.0/go.mod h1:qoJWxmGSIBmAeriMx19ogtrEPrGtDbPK634QFIcLAhc=
golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/multierror"
"github.com/conduitio/conduit/pkg/plugin/processor/builtin/avro/schemaregistry"
"github.com/conduitio/conduit/pkg/plugin/processor/builtin/impl/avro/schemaregistry"
"github.com/lovromazgon/franz-go/pkg/sr"
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
sdk "github.com/conduitio/conduit-processor-sdk"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/plugin/processor/builtin/avro/schemaregistry"
"github.com/conduitio/conduit/pkg/plugin/processor/builtin/impl/avro/schemaregistry"
"github.com/lovromazgon/franz-go/pkg/sr"
)

Expand Down Expand Up @@ -107,7 +107,7 @@ func NewDecodeProcessor(logger log.CtxLogger) sdk.Processor {

func (p *decodeProcessor) Specification() (sdk.Specification, error) {
return sdk.Specification{
Name: "avro.encode",
Name: "avro.decode",
Summary: "Decodes a field's raw data in the Avro format",
Description: `The processor takes raw data (bytes or a string) in the specified field and decodes
it from the [Avro format](https://avro.apache.org/) into structured data. It extracts the schema ID from the data,
Expand Down
117 changes: 117 additions & 0 deletions pkg/plugin/processor/builtin/impl/avro/decode_examples_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !integration

package avro

import (
"context"
"fmt"

"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-processor-sdk"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/plugin/processor/builtin/impl/avro/schemaregistry"
"github.com/conduitio/conduit/pkg/plugin/processor/builtin/internal/exampleutil"
"github.com/lovromazgon/franz-go/pkg/sr"
)

//nolint:govet // a more descriptive example description
func ExampleDecodeProcessor() {
url, cleanup := schemaregistry.ExampleSchemaRegistryURL("ExampleDecodeProcessor", 54322)
defer cleanup()

client, err := schemaregistry.NewClient(log.Nop(), sr.URLs(url))
if err != nil {
panic(fmt.Sprintf("failed to create schema registry client: %v", err))
}

_, err = client.CreateSchema(context.Background(), "example-decode", sr.Schema{
Type: sr.TypeAvro,
Schema: `
{
"type":"record",
"name":"record",
"fields":[
{"name":"myString","type":"string"},
{"name":"myInt","type":"int"}
]
}`,
})
if err != nil {
panic(fmt.Sprintf("failed to create schema: %v", err))
}

p := NewDecodeProcessor(log.Nop())

exampleutil.RunExample(p, exampleutil.Example{
Summary: "Decode a record field in Avro format",
Description: `This example shows the usage of the ` + "`avro.decode`" + ` processor.
The processor decodes the record's` + "`.Key`" + ` field using the schema that is
downloaded from the schema registry and needs to exist under the subject` + "`example-decode`" + `.
In this example we use the following schema:
` + "```json" + `
{
"type":"record",
"name":"record",
"fields":[
{"name":"myString","type":"string"},
{"name":"myInt","type":"int"}
]
}
` + "```",
Config: map[string]string{
"url": url,
"field": ".Key",
},
Have: opencdc.Record{
Position: opencdc.Position("test-position"),
Operation: opencdc.OperationCreate,
Metadata: map[string]string{"key1": "val1"},
Key: opencdc.RawData([]byte{0, 0, 0, 0, 1, 6, 98, 97, 114, 2}),
},
Want: sdk.SingleRecord{
Position: opencdc.Position("test-position"),
Operation: opencdc.OperationCreate,
Metadata: map[string]string{"key1": "val1"},
Key: opencdc.StructuredData{
"myString": "bar",
"myInt": 1,
},
}})

// Output:
// processor transformed record:
// --- before
// +++ after
// @@ -1,12 +1,15 @@
// {
// "position": "dGVzdC1wb3NpdGlvbg==",
// "operation": "create",
// "metadata": {
// "key1": "val1"
// },
// - "key": "\u0000\u0000\u0000\u0000\u0001\u0006bar\u0002",
// + "key": {
// + "myInt": 1,
// + "myString": "bar"
// + },
// "payload": {
// "before": null,
// "after": null
// }
// }
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"testing"

"github.com/conduitio/conduit/pkg/plugin/processor/builtin/internal"

"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-processor-sdk"
"github.com/conduitio/conduit/pkg/foundation/log"
Expand Down Expand Up @@ -81,7 +83,7 @@ func TestDecodeProcessor_Process_RawData_CustomField(t *testing.T) {

got := underTest.Process(ctx, []opencdc.Record{input})
is.Equal(1, len(got))
is.Equal("", cmp.Diff(want, got[0], cmpProcessedRecordOpts...))
is.Equal("", cmp.Diff(want, got[0], internal.CmpProcessedRecordOpts...))
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
sdk "github.com/conduitio/conduit-processor-sdk"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/plugin/processor/builtin/avro/schemaregistry"
"github.com/conduitio/conduit/pkg/plugin/processor/builtin/impl/avro/schemaregistry"
"github.com/goccy/go-json"
"github.com/lovromazgon/franz-go/pkg/sr"
)
Expand Down
Loading

0 comments on commit d2c1b77

Please sign in to comment.