Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BP: add builtin processor part#1 #1371

Merged
merged 34 commits into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
fc2f857
add processors field.set & field.subset.exclude
maha-hajja Feb 8, 2024
5171329
delete extra test
maha-hajja Feb 8, 2024
0f5de56
fix tests
maha-hajja Feb 9, 2024
5542a96
add field.rename processor
maha-hajja Feb 9, 2024
37ba980
add exclusion validation to field.rename processor
maha-hajja Feb 9, 2024
b0c41e3
refactor + fix error return in Process method
maha-hajja Feb 15, 2024
27c7eff
add field.convert processor
maha-hajja Feb 15, 2024
dbe15e7
add filter processor
maha-hajja Feb 15, 2024
a8c043d
only structured data
maha-hajja Feb 15, 2024
2ffad51
address reviews part1
maha-hajja Feb 17, 2024
a229f67
use paramgen, update Configure method, update tests
maha-hajja Feb 20, 2024
2ceabcf
evaluate value for processor setField
maha-hajja Feb 20, 2024
9d5eb3c
use a slice of referenceResolvers for processors with multiple fields…
maha-hajja Feb 21, 2024
f601828
do not allow .Position to be set in setField processor
maha-hajja Feb 22, 2024
e6c9a5d
update setField test, add non existent field test
maha-hajja Feb 22, 2024
378624c
use the new Rename() method from referenceResolver
maha-hajja Feb 23, 2024
e881814
Merge branch 'feature/better-processors' into maha/rewrite-processors1
maha-hajja Feb 23, 2024
31f79af
address reviews
maha-hajja Feb 28, 2024
98d8492
address reviews2
maha-hajja Feb 28, 2024
80c6661
fix renameField bug + add New() method to each processor
maha-hajja Feb 28, 2024
bc7af3f
linter fix
maha-hajja Feb 28, 2024
ecfbcf5
add processor examples
maha-hajja Feb 29, 2024
0f7a327
processor json
maha-hajja Feb 29, 2024
8e3e5b0
Update pkg/plugin/processor/builtin/convertField.go
maha-hajja Feb 29, 2024
a35e0c8
address reviews
maha-hajja Feb 29, 2024
94fd987
Merge branch 'maha/rewrite-processors1' of github.com:ConduitIO/condu…
maha-hajja Feb 29, 2024
3b3d9f5
setFeild example
maha-hajja Feb 29, 2024
f73ebab
Merge branch 'feature/better-processors' into maha/rewrite-processors1
maha-hajja Feb 29, 2024
b35620b
setFeild example
maha-hajja Feb 29, 2024
ecba048
make generate
maha-hajja Feb 29, 2024
e7bfd33
fix paramgen regex
maha-hajja Mar 1, 2024
eb325c3
generate
maha-hajja Mar 1, 2024
7fd4864
fix regex
maha-hajja Mar 1, 2024
f782d35
add paramgen to tools
maha-hajja Mar 1, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ require (
github.com/conduitio/conduit-connector-protocol v0.5.1-0.20240104160905-e9e61586fb8d
github.com/conduitio/conduit-connector-s3 v0.5.1
github.com/conduitio/conduit-connector-sdk v0.8.0
github.com/conduitio/conduit-processor-sdk v0.0.0-20240216180055-cbdc5dcb5d31
github.com/conduitio/conduit-processor-sdk v0.0.0-20240228181202-04383fd82d29
github.com/conduitio/yaml/v3 v3.3.0
github.com/dgraph-io/badger/v4 v4.2.0
github.com/gammazero/deque v0.2.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1095,8 +1095,8 @@ github.com/conduitio/conduit-connector-s3 v0.5.1 h1:yRo8004ryCIZc/S3iWQ1rN6pm6bj
github.com/conduitio/conduit-connector-s3 v0.5.1/go.mod h1:nbxzsyS95gbFJ28Job9vFFB+byRFINSv70/13Yi4mKQ=
github.com/conduitio/conduit-connector-sdk v0.8.0 h1:gvchqoj5d3AQsBoIosx4i32L8Ex9+5BuAyHi/IM9VD4=
github.com/conduitio/conduit-connector-sdk v0.8.0/go.mod h1:nOz4K3X6fD8YMe5CPbULwSEE18Eu02ZrpT6o6KwQfxs=
github.com/conduitio/conduit-processor-sdk v0.0.0-20240216180055-cbdc5dcb5d31 h1:a4x/bVFMZrPGnOM502FwbVrM0dfrttL1FZvKqTtYVP0=
github.com/conduitio/conduit-processor-sdk v0.0.0-20240216180055-cbdc5dcb5d31/go.mod h1:F/tmVZiXzZY60bzUSqwStM1TPaGvmQ9b1n1LuVtAOio=
github.com/conduitio/conduit-processor-sdk v0.0.0-20240228181202-04383fd82d29 h1:X6e4OnuJOHb3znmkvSPEf1hVJB7aWAOW7jZX+fMhLf8=
github.com/conduitio/conduit-processor-sdk v0.0.0-20240228181202-04383fd82d29/go.mod h1:F/tmVZiXzZY60bzUSqwStM1TPaGvmQ9b1n1LuVtAOio=
github.com/conduitio/yaml/v3 v3.3.0 h1:kbbaOSHcuH39gP4+rgbJGl6DSbLZcJgEaBvkEXJlCsI=
github.com/conduitio/yaml/v3 v3.3.0/go.mod h1:JNgFMOX1t8W4YJuRZOh6GggVtSMsgP9XgTw+7dIenpc=
github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I=
Expand Down
156 changes: 156 additions & 0 deletions pkg/plugin/processor/builtin/convertField.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:generate paramgen -output=convertField_paramgen.go convertFieldConfig

package builtin

import (
"context"
"fmt"
"strconv"

"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-processor-sdk"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
)

type convertField struct {
referenceResolver sdk.ReferenceResolver
config convertFieldConfig

sdk.UnimplementedProcessor
}

func newConvertField() *convertField {
return &convertField{}
}

type convertFieldConfig struct {
// Field is the target field, as it would be addressed in a Go template (e.g. `.Payload.After.foo`).
// you can only convert fields that are under .Key and .Payload, and said fields should contain structured data.
Field string `json:"field" validate:"required,regex=^\\.(Payload|Key)\\.*"`
maha-hajja marked this conversation as resolved.
Show resolved Hide resolved
// Type is the target field type after conversion, available options are: string, int, float, bool.
Type string `json:"type" validate:"required,inclusion=string|int|float|bool"`
}

func (p *convertField) Specification() (sdk.Specification, error) {
return sdk.Specification{
Name: "field.convert",
Summary: "Convert the type of a field.",
Description: `Convert takes the field of one type and converts it into another type (e.g. string to integer).
The applicable types are string, int, float and bool. Converting can be done between any combination of types. Note that
booleans will be converted to numeric values 1 (true) and 0 (false). Processor is only applicable to .Key, .Payload.Before
and .Payload.After prefixes, and only applicable if said fields contain structured data.
If the record contains raw JSON data, then use the processor "decode.json" to parse it into structured data first.`,
Version: "v0.1.0",
Author: "Meroxa, Inc.",
Parameters: convertFieldConfig{}.Parameters(),
}, nil
}

func (p *convertField) Configure(ctx context.Context, m map[string]string) error {
err := sdk.ParseConfig(ctx, m, &p.config, convertFieldConfig{}.Parameters())
if err != nil {
return cerrors.Errorf("failed to parse configuration: %w", err)
}

resolver, err := sdk.NewReferenceResolver(p.config.Field)
if err != nil {
return cerrors.Errorf("failed to parse the %q param: %w", "field", err)
}
p.referenceResolver = resolver
return nil
}

func (p *convertField) Open(context.Context) error {
return nil
}

func (p *convertField) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord {
out := make([]sdk.ProcessedRecord, 0, len(records))
hariso marked this conversation as resolved.
Show resolved Hide resolved
for _, record := range records {
rec := record
ref, err := p.referenceResolver.Resolve(&rec)
if err != nil {
return append(out, sdk.ErrorRecord{Error: err})
hariso marked this conversation as resolved.
Show resolved Hide resolved
}
newVal, err := p.stringToType(p.toString(ref.Get()), p.config.Type)
if err != nil {
return append(out, sdk.ErrorRecord{Error: err})
}
err = ref.Set(newVal)
if err != nil {
return append(out, sdk.ErrorRecord{Error: err})
}
out = append(out, sdk.SingleRecord(rec))
}
return out
}

func (p *convertField) stringToType(value, typ string) (any, error) {
switch typ {
case "string":
return value, nil
case "int":
newVal, err := strconv.Atoi(value)
if err != nil {
return nil, err
}
return newVal, nil
case "float":
newVal, err := strconv.ParseFloat(value, 64)
if err != nil {
return nil, err
}
return newVal, nil
case "bool":
newVal, err := strconv.ParseBool(value)
if err != nil {
return nil, err
}
return newVal, nil
default:
return nil, cerrors.Errorf("undefined type %q", typ)
}
}

func (p *convertField) toString(value any) string {
switch v := value.(type) {
case string:
return v
case int:
return strconv.Itoa(v)
case float64:
return strconv.FormatFloat(v, 'f', -1, 64)
case bool:
if p.config.Type == "int" || p.config.Type == "float" {
return p.boolToStringNumber(v)
}
return strconv.FormatBool(v)
default:
return fmt.Sprintf("%v", value)
}
}

func (p *convertField) boolToStringNumber(b bool) string {
if b {
return "1"
}
return "0"
}

func (p *convertField) Teardown(context.Context) error {
return nil
}
140 changes: 140 additions & 0 deletions pkg/plugin/processor/builtin/convertField_examples_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package builtin

import (
"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-processor-sdk"
)

//nolint:govet // a more descriptive example description
func ExampleConvertFieldProcessor_StringToInt() {
p := newConvertField()

RunExample(p, example{
Description: `change .Key.id type to int`,
Config: map[string]string{"field": ".Key.id", "type": "int"},
Have: opencdc.Record{
Operation: opencdc.OperationUpdate,
Key: opencdc.StructuredData{"id": "123"},
Payload: opencdc.Change{After: opencdc.StructuredData{"foo": "bar"}},
},
Want: sdk.SingleRecord{
Operation: opencdc.OperationUpdate,
Key: opencdc.StructuredData{"id": 123},
Payload: opencdc.Change{After: opencdc.StructuredData{"foo": "bar"}},
}})

// Output:
// processor transformed record:
// --- before
// +++ after
// @@ -1,14 +1,14 @@
// {
// "position": null,
// "operation": "update",
// "metadata": null,
// "key": {
// - "id": "123"
// + "id": 123
// },
// "payload": {
// "before": null,
// "after": {
// "foo": "bar"
// }
// }
// }
}

//nolint:govet // a more descriptive example description
func ExampleConvertFieldProcessor_IntToBool() {
p := newConvertField()

RunExample(p, example{
Description: `change .Payload.After.done type to bool`,
Config: map[string]string{"field": ".Payload.After.done", "type": "bool"},
Have: opencdc.Record{
Operation: opencdc.OperationUpdate,
Key: opencdc.StructuredData{"id": "123"},
Payload: opencdc.Change{After: opencdc.StructuredData{"done": "1"}},
},
Want: sdk.SingleRecord{
Operation: opencdc.OperationUpdate,
Key: opencdc.StructuredData{"id": "123"},
Payload: opencdc.Change{After: opencdc.StructuredData{"done": true}},
}})

// Output:
// processor transformed record:
// --- before
// +++ after
// @@ -1,14 +1,14 @@
// {
// "position": null,
// "operation": "update",
// "metadata": null,
// "key": {
// "id": "123"
// },
// "payload": {
// "before": null,
// "after": {
// - "done": "1"
// + "done": true
// }
// }
// }
}

//nolint:govet // a more descriptive example description
func ExampleConvertFieldProcessor_FloatToString() {
p := newConvertField()

RunExample(p, example{
Description: `change .Key.id type to string`,
Config: map[string]string{"field": ".Key.id", "type": "string"},
Have: opencdc.Record{
Operation: opencdc.OperationUpdate,
Key: opencdc.StructuredData{"id": 123.345},
Payload: opencdc.Change{After: opencdc.StructuredData{"foo": "bar"}},
},
Want: sdk.SingleRecord{
Operation: opencdc.OperationUpdate,
Key: opencdc.StructuredData{"id": "123.345"},
Payload: opencdc.Change{After: opencdc.StructuredData{"foo": "bar"}},
}})

// Output:
// processor transformed record:
// --- before
// +++ after
// @@ -1,14 +1,14 @@
// {
// "position": null,
// "operation": "update",
// "metadata": null,
// "key": {
// - "id": 123.345
// + "id": "123.345"
// },
// "payload": {
// "before": null,
// "after": {
// "foo": "bar"
// }
// }
// }
}
33 changes: 33 additions & 0 deletions pkg/plugin/processor/builtin/convertField_paramgen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading