diff --git a/pkg/processor/procbuiltin/unwrap.go b/pkg/processor/procbuiltin/unwrap.go index 4449c5994..1d4f292bd 100644 --- a/pkg/processor/procbuiltin/unwrap.go +++ b/pkg/processor/procbuiltin/unwrap.go @@ -43,6 +43,7 @@ const ( FormatDebezium = "debezium" FormatKafkaConnect = "kafka-connect" + FormatOpenCDC = "opencdc" ) func init() { @@ -60,6 +61,8 @@ func Unwrap(config processor.Config) (processor.Interface, error) { proc.unwrapper = &debeziumUnwrapper{} case FormatKafkaConnect: proc.unwrapper = &kafkaConnectUnwrapper{} + case FormatOpenCDC: + proc.unwrapper = &openCDCUnwrapper{} default: return nil, cerrors.Errorf("%s: %q is not a valid format", unwrapProcType, format) } @@ -93,6 +96,180 @@ func (p *unwrapProcessor) Process(_ context.Context, in record.Record) (record.R return out, nil } +/* +Example of an OpenCDC record: +{ + "key": "NWQ0N2UwZGQtNTkxYi00MGEyLTk3YzMtYzc1MDY0MWU3NTc1", + "metadata": { + "conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source", + "opencdc.readAt": "1706028881541916000", + "opencdc.version": "v1" + }, + "operation": "create", + "payload": { + "after": { + "event_id": 2041181862, + "msg": "string 4c88f20f-aa77-4f4b-9354-e4fdb1989a52", + "pg_generator": false, + "sensor_id": 54434691, + "triggered": false + }, + "before": null + }, + "position": "ZWIwNmJiMmMtNWNhMS00YjUyLWE2ZmMtYzc0OTFlZDQ3OTYz" +} +*/ + +// openCDCUnwrapper unwraps an OpenCDC record from the payload, by unmarhsalling rec.Payload.After into type Record. +type openCDCUnwrapper struct{} + +// UnwrapOperation extracts operation from a structuredData record. +func (o *openCDCUnwrapper) UnwrapOperation(structData record.StructuredData) (record.Operation, error) { + var operation record.Operation + op, ok := structData["operation"] + if !ok { + return operation, cerrors.Errorf("record payload after doesn't contain operation") + } + + switch opType := op.(type) { + case record.Operation: + operation = opType + case string: + if err := operation.UnmarshalText([]byte(opType)); err != nil { + return operation, cerrors.Errorf("couldn't unmarshal record operation") + } + default: + return operation, cerrors.Errorf("expected a record.Operation or a string, got %T", opType) + } + return operation, nil +} + +// UnwrapMetadata extracts metadata from a structuredData record. +func (o *openCDCUnwrapper) UnwrapMetadata(structData record.StructuredData) (record.Metadata, error) { + var metadata record.Metadata + meta, ok := structData["metadata"] + if !ok { + return metadata, cerrors.Errorf("record payload after doesn't contain metadata") + } + + switch m := meta.(type) { + case record.Metadata: + metadata = m + case map[string]interface{}: + metadata = make(record.Metadata, len(m)) + for k, v := range m { + metadata[k] = fmt.Sprint(v) + } + default: + return metadata, cerrors.Errorf("expected a record.Metadata or a map[string]interface{}, got %T", m) + } + return metadata, nil +} + +// UnwrapKey extracts key from a structuredData record. +func (o *openCDCUnwrapper) UnwrapKey(structData record.StructuredData) (record.Data, error) { + var key record.Data + ky, ok := structData["key"] + if !ok { + return key, cerrors.Errorf("record payload after doesn't contain key") + } + + switch k := ky.(type) { + case record.Data: + key = k + case string: + key = record.RawData{Raw: []byte(k)} + default: + return key, cerrors.Errorf("expected a record.Data or a string, got %T", k) + } + + return key, nil +} + +// UnwrapPayload extracts payload from a structuredData record. +func (o *openCDCUnwrapper) UnwrapPayload(structData record.StructuredData) (record.Change, error) { + var payload record.Change + pl, ok := structData["payload"] + if !ok { + return payload, cerrors.Errorf("record payload doesn't contain payload") + } + + switch p := pl.(type) { + case record.Change: + payload = p + case map[string]interface{}: + afterData, ok := p["after"] + if !ok { + return payload, cerrors.Errorf("record payload after doesn't contain payload.after") + } + + data, ok := afterData.(map[string]interface{}) + if !ok { + return payload, cerrors.Errorf("record payload after payload.after is not a map") + } + + convertedData := make(record.StructuredData, len(data)) + for k, v := range data { + convertedData[k] = v + } + + payload = record.Change{ + Before: nil, + After: convertedData, + } + default: + return payload, cerrors.Errorf("expected a record.Change or a map[string]interface{}, got %T", p) + } + return payload, nil +} + +// Unwrap replaces the whole record.payload with record.payload.after.payload except position. +func (o *openCDCUnwrapper) Unwrap(rec record.Record) (record.Record, error) { + var structData record.StructuredData + data := rec.Payload.After + switch d := data.(type) { + case record.RawData: + // unmarshal raw data to structured + if err := json.Unmarshal(data.Bytes(), &structData); err != nil { + return record.Record{}, cerrors.Errorf("failed to unmarshal raw data as JSON: %w", unwrapProcType, err) + } + case record.StructuredData: + structData = d + default: + return record.Record{}, cerrors.Errorf("unexpected data type %T", unwrapProcType, data) + } + + operation, err := o.UnwrapOperation(structData) + if err != nil { + return record.Record{}, err + } + + metadata, err := o.UnwrapMetadata(structData) + if err != nil { + return record.Record{}, err + } + + key, err := o.UnwrapKey(structData) + if err != nil { + return record.Record{}, err + } + + payload, err := o.UnwrapPayload(structData) + if err != nil { + return record.Record{}, err + } + + // Position is the only key we preserve from the original record to maintain the reference respect other messages + // that will be coming from in the event of chaining pipelines (e.g.: source -> kafka, kafka -> destination) + return record.Record{ + Key: key, + Position: rec.Position, + Metadata: metadata, + Payload: payload, + Operation: operation, + }, nil +} + /* Example of a kafka-connect record: { @@ -186,7 +363,6 @@ Example of a debezium record: "schema": {} // will be ignored } */ - // debeziumUnwrapper unwraps a debezium record from the payload. type debeziumUnwrapper struct { kafkaConnectUnwrapper kafkaConnectUnwrapper diff --git a/pkg/processor/procbuiltin/unwrap_test.go b/pkg/processor/procbuiltin/unwrap_test.go index 513fe3180..3a1926d48 100644 --- a/pkg/processor/procbuiltin/unwrap_test.go +++ b/pkg/processor/procbuiltin/unwrap_test.go @@ -24,7 +24,7 @@ import ( "github.com/matryer/is" ) -const DebeziumRecord = `{ +const DebeziumRecordPayload = `{ "payload": { "after": { "description": "test1", @@ -42,6 +42,26 @@ const DebeziumRecord = `{ "schema": {} }` +const OpenCDCRecordPayload = `{ + "position": "NzgyNjJmODUtODNmMS00ZGQwLWEyZDAtNTRmNjA1ZjkyYTg0", + "operation": "create", + "metadata": { + "conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source", + "opencdc.readAt": "1706028953595546000", + "opencdc.version": "v1" + }, + "key": "MTc3NzQ5NDEtNTdhMi00MmZhLWI0MzAtODkxMmE5NDI0YjNh", + "payload": { + "after": { + "event_id": 1747353650, + "msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d", + "pg_generator": false, + "sensor_id": 1250383582, + "triggered": false + } + } + }` + func TestUnwrap_Config(t *testing.T) { tests := []struct { name string @@ -108,7 +128,7 @@ func TestUnwrap_Process(t *testing.T) { Payload: record.Change{ Before: nil, After: record.RawData{ - Raw: []byte(DebeziumRecord), + Raw: []byte(DebeziumRecordPayload), }, }, }, @@ -401,6 +421,300 @@ func TestUnwrap_Process(t *testing.T) { }, wantErr: false, }, + { + name: "opencdc with structured data and no payload after", + config: processor.Config{ + Settings: map[string]string{"format": "opencdc"}, + }, + record: record.Record{ + Key: record.RawData{Raw: []byte("one-key")}, + Operation: record.OperationCreate, + Metadata: map[string]string{}, + Payload: record.Change{ + Before: nil, + After: nil, + }, + Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"), + }, + want: record.Record{}, + wantErr: true, + }, + { + name: "opencdc with an invalid operation", + config: processor.Config{ + Settings: map[string]string{"format": "opencdc"}, + }, + record: record.Record{ + Key: record.RawData{Raw: []byte("one-key-raw-data")}, + Operation: record.OperationCreate, + Metadata: map[string]string{ + "conduit.source.connector.id": "dest-log-78lpnchx7tzpyqz:source-kafka", + "kafka.topic": "stream-78lpnchx7tzpyqz-generator", + "opencdc.createdAt": "1706028953595000000", + "opencdc.readAt": "1706028953606997000", + "opencdc.version": "v1", + }, + Payload: record.Change{ + Before: nil, + After: record.RawData{ + Raw: []byte(`{ + "position": "NzgyNjJmODUtODNmMS00ZGQwLWEyZDAtNTRmNjA1ZjkyYTg0", + "operation": "invalid", + "metadata": { + "conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source", + "opencdc.readAt": "1706028953595546000", + "opencdc.version": "v1" + }, + "key": "MTc3NzQ5NDEtNTdhMi00MmZhLWI0MzAtODkxMmE5NDI0YjNh", + "payload": { + "after": { + "event_id": 1747353650, + "msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d", + "pg_generator": false, + "sensor_id": 1250383582, + "triggered": false + } + } + }`, + ), + }, + }, + Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"), + }, + want: record.Record{}, + wantErr: true, + }, + { + name: "opencdc with an invalid metadata", + config: processor.Config{ + Settings: map[string]string{"format": "opencdc"}, + }, + record: record.Record{ + Key: record.RawData{Raw: []byte("one-key-raw-data")}, + Operation: record.OperationCreate, + Metadata: map[string]string{ + "conduit.source.connector.id": "dest-log-78lpnchx7tzpyqz:source-kafka", + "kafka.topic": "stream-78lpnchx7tzpyqz-generator", + "opencdc.createdAt": "1706028953595000000", + "opencdc.readAt": "1706028953606997000", + "opencdc.version": "v1", + }, + Payload: record.Change{ + Before: nil, + After: record.RawData{ + Raw: []byte(`{ + "position": "NzgyNjJmODUtODNmMS00ZGQwLWEyZDAtNTRmNjA1ZjkyYTg0", + "operation": "create", + "metadata": "invalid", + "key": "MTc3NzQ5NDEtNTdhMi00MmZhLWI0MzAtODkxMmE5NDI0YjNh", + "payload": { + "after": { + "event_id": 1747353650, + "msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d", + "pg_generator": false, + "sensor_id": 1250383582, + "triggered": false + } + } + }`, + ), + }, + }, + Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"), + }, + want: record.Record{}, + wantErr: true, + }, + { + name: "opencdc with an invalid key", + config: processor.Config{ + Settings: map[string]string{"format": "opencdc"}, + }, + record: record.Record{ + Key: record.RawData{Raw: []byte("one-key-raw-data")}, + Operation: record.OperationCreate, + Metadata: map[string]string{ + "conduit.source.connector.id": "dest-log-78lpnchx7tzpyqz:source-kafka", + "kafka.topic": "stream-78lpnchx7tzpyqz-generator", + "opencdc.createdAt": "1706028953595000000", + "opencdc.readAt": "1706028953606997000", + "opencdc.version": "v1", + }, + Payload: record.Change{ + Before: nil, + After: record.RawData{ + Raw: []byte(`{ + "position": "NzgyNjJmODUtODNmMS00ZGQwLWEyZDAtNTRmNjA1ZjkyYTg0", + "operation": "create", + "metadata": { + "conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source", + "opencdc.readAt": "1706028953595546000", + "opencdc.version": "v1" + }, + "key": 1, + "payload": { + "after": { + "event_id": 1747353650, + "msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d", + "pg_generator": false, + "sensor_id": 1250383582, + "triggered": false + } + } + }`, + ), + }, + }, + Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"), + }, + want: record.Record{}, + wantErr: true, + }, + { + name: "opencdc with an invalid payload", + config: processor.Config{ + Settings: map[string]string{"format": "opencdc"}, + }, + record: record.Record{ + Key: record.RawData{Raw: []byte("one-key-raw-data")}, + Operation: record.OperationCreate, + Metadata: map[string]string{ + "conduit.source.connector.id": "dest-log-78lpnchx7tzpyqz:source-kafka", + "kafka.topic": "stream-78lpnchx7tzpyqz-generator", + "opencdc.createdAt": "1706028953595000000", + "opencdc.readAt": "1706028953606997000", + "opencdc.version": "v1", + }, + Payload: record.Change{ + Before: nil, + After: record.RawData{ + Raw: []byte(`{ + "position": "NzgyNjJmODUtODNmMS00ZGQwLWEyZDAtNTRmNjA1ZjkyYTg0", + "operation": "create", + "metadata": { + "conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source", + "opencdc.readAt": "1706028953595546000", + "opencdc.version": "v1" + }, + "key": "MTc3NzQ5NDEtNTdhMi00MmZhLWI0MzAtODkxMmE5NDI0YjNh", + }`, + ), + }, + }, + Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"), + }, + want: record.Record{}, + wantErr: true, + }, + { + name: "opencdc with structured data", + config: processor.Config{ + Settings: map[string]string{"format": "opencdc"}, + }, + record: record.Record{ + Key: record.RawData{Raw: []byte("one-key")}, + Operation: record.OperationCreate, + Metadata: map[string]string{ + "conduit.source.connector.id": "dest-log-78lpnchx7tzpyqz:source-kafka", + "kafka.topic": "stream-78lpnchx7tzpyqz-generator", + "opencdc.createdAt": "1706028953595000000", + "opencdc.readAt": "1706028953606997000", + "opencdc.version": "v1", + }, + Payload: record.Change{ + Before: nil, + After: record.StructuredData{ + "position": []byte("NzgyNjJmODUtODNmMS00ZGQwLWEyZDAtNTRmNjA1ZjkyYTg0"), + "operation": record.OperationCreate, + "metadata": record.Metadata{ + "conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source", + "opencdc.readAt": "1706028953595546000", + "opencdc.version": "v1", + }, + "key": record.RawData{ + Raw: []byte("MTc3NzQ5NDEtNTdhMi00MmZhLWI0MzAtODkxMmE5NDI0YjNh"), + }, + "payload": record.Change{ + Before: nil, + After: record.StructuredData{ + "event_id": 1747353650, + "msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d", + "pg_generator": false, + "sensor_id": 1250383582, + "triggered": false, + }, + }, + }, + }, + Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"), + }, + want: record.Record{ + Operation: record.OperationCreate, + Metadata: record.Metadata{ + "conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source", + "opencdc.readAt": "1706028953595546000", + "opencdc.version": "v1", + }, + Payload: record.Change{ + Before: nil, + After: record.StructuredData{ + "event_id": 1747353650, + "msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d", + "pg_generator": false, + "sensor_id": 1250383582, + "triggered": false, + }, + }, + Key: record.RawData{Raw: []byte("MTc3NzQ5NDEtNTdhMi00MmZhLWI0MzAtODkxMmE5NDI0YjNh")}, + Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"), + }, + wantErr: false, + }, + { + name: "opencdc with raw data", + config: processor.Config{ + Settings: map[string]string{"format": "opencdc"}, + }, + record: record.Record{ + Key: record.RawData{Raw: []byte("one-key-raw-data")}, + Operation: record.OperationCreate, + Metadata: map[string]string{ + "conduit.source.connector.id": "dest-log-78lpnchx7tzpyqz:source-kafka", + "kafka.topic": "stream-78lpnchx7tzpyqz-generator", + "opencdc.createdAt": "1706028953595000000", + "opencdc.readAt": "1706028953606997000", + "opencdc.version": "v1", + }, + Payload: record.Change{ + Before: nil, + After: record.RawData{ + Raw: []byte(OpenCDCRecordPayload), + }, + }, + Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"), + }, + want: record.Record{ + Operation: record.OperationCreate, + Metadata: record.Metadata{ + "conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source", + "opencdc.readAt": "1706028953595546000", + "opencdc.version": "v1", + }, + Payload: record.Change{ + Before: nil, + After: record.StructuredData{ + "event_id": float64(1747353650), + "msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d", + "pg_generator": false, + "sensor_id": float64(1250383582), + "triggered": false, + }, + }, + Key: record.RawData{Raw: []byte("MTc3NzQ5NDEtNTdhMi00MmZhLWI0MzAtODkxMmE5NDI0YjNh")}, + Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"), + }, + wantErr: false, + }, } for _, tt := range tests { @@ -413,6 +727,7 @@ func TestUnwrap_Process(t *testing.T) { if (err != nil) != tt.wantErr { t.Fatalf("process() error = %v, wantErr = %v", err, tt.wantErr) } + if diff := cmp.Diff(tt.want, got); diff != "" { t.Errorf("process() diff = %s", diff) }