diff --git a/go.mod b/go.mod index 4920cb185..b7a77f9fe 100644 --- a/go.mod +++ b/go.mod @@ -32,7 +32,7 @@ require ( github.com/jackc/pgx/v5 v5.4.2 github.com/jinzhu/copier v0.3.5 github.com/jpillora/backoff v1.0.0 - github.com/lovromazgon/franz-go/pkg/sr v0.0.0-20230605121418-82e53767f0ac + github.com/lovromazgon/franz-go/pkg/sr v0.0.0-20230630140346-bb9ce3f90f4a github.com/matryer/is v1.4.1 github.com/modern-go/reflect2 v1.0.2 github.com/piotrkowalczuk/promgrpc/v4 v4.1.0 diff --git a/go.sum b/go.sum index 1276f2e6f..4eabe48cc 100644 --- a/go.sum +++ b/go.sum @@ -484,8 +484,8 @@ github.com/lib/pq v1.1.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.10.2/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lib/pq v1.10.4 h1:SO9z7FRPzA03QhHKJrH5BXA6HU1rS4V2nIVrrNC1iYk= -github.com/lovromazgon/franz-go/pkg/sr v0.0.0-20230605121418-82e53767f0ac h1:f0RCTaThW3/D5xByrGxfvR3o95UZsrkXFVkKSY+s89w= -github.com/lovromazgon/franz-go/pkg/sr v0.0.0-20230605121418-82e53767f0ac/go.mod h1:iz9EnaFViALD6sVqxYHs8BPC0ZEQtfhTpN7SG5b0Nqo= +github.com/lovromazgon/franz-go/pkg/sr v0.0.0-20230630140346-bb9ce3f90f4a h1:TrxQUmJBE1pZsnTW3rqG5Fsx3Xz0wGm5xgqLDV/mMGk= +github.com/lovromazgon/franz-go/pkg/sr v0.0.0-20230630140346-bb9ce3f90f4a/go.mod h1:iz9EnaFViALD6sVqxYHs8BPC0ZEQtfhTpN7SG5b0Nqo= github.com/matryer/is v1.4.1 h1:55ehd8zaGABKLXQUe2awZ99BD/PTc2ls+KV/dXphgEQ= github.com/matryer/is v1.4.1/go.mod h1:8I/i5uYgLzgsgEloJE1U6xx5HkBQpAZvepWuujKwMRU= github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ= diff --git a/pkg/processor/schemaregistry/client.go b/pkg/processor/schemaregistry/client.go index 7e44672be..9bf9edd81 100644 --- a/pkg/processor/schemaregistry/client.go +++ b/pkg/processor/schemaregistry/client.go @@ -60,7 +60,22 @@ func (c *Client) CreateSchema(ctx context.Context, subject string, schema sr.Sch ss, err := c.cache.GetBySubjectText(subject, schema.Schema, func() (sr.SubjectSchema, error) { logEvent.Msg("schema cache miss") logEvent = nil // disable output for hit - return c.client.CreateSchema(ctx, subject, schema) + + // Check if the subject exists. Ignore the error as this is not critical + // for creating a schema, we assume the subject exists in case of an error. + versions, _ := c.client.SubjectVersions(ctx, subject, sr.ShowDeleted) + subjectExists := len(versions) > 0 + + ss, err := c.client.CreateSchema(ctx, subject, schema) + if err != nil { + return ss, err + } + + if !subjectExists { + // if we are created the schema we need to disable compatibility checks + c.client.SetCompatibilityLevel(ctx, sr.CompatNone, subject) + } + return ss, nil }) if err != nil { return sr.SubjectSchema{}, cerrors.Errorf("failed to create schema with subject %q: %w", subject, err) diff --git a/pkg/processor/schemaregistry/client_fake_test.go b/pkg/processor/schemaregistry/client_fake_test.go index acbb2884a..43f99a950 100644 --- a/pkg/processor/schemaregistry/client_fake_test.go +++ b/pkg/processor/schemaregistry/client_fake_test.go @@ -198,7 +198,6 @@ func (fr *fakeRegistry) findBySubjectVersion(subject string, version int) (sr.Su // fakeServer is a fake schema registry server. type fakeServer struct { - mux http.ServeMux fr fakeRegistry logf func(format string, args ...any) } @@ -210,43 +209,75 @@ func newFakeServer(logf func(format string, args ...any)) *fakeServer { if logf != nil { fs.logf = logf } - fs.mux.Handle("/schemas/ids/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - tokens := strings.Split(r.URL.EscapedPath(), "/") - switch { - case len(tokens) == 4: - fs.schemaByID(w, r) - case len(tokens) == 5 && tokens[4] == "versions": - fs.subjectVersionsByID(w, r) - default: - http.NotFound(w, r) - } - })) - fs.mux.Handle("/subjects/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - tokens := strings.Split(r.URL.EscapedPath(), "/") - switch { - case len(tokens) == 4 && tokens[3] == "versions": - fs.createSchema(w, r) - case len(tokens) == 5 && tokens[3] == "versions": - fs.schemaBySubjectVersion(w, r) - default: - http.NotFound(w, r) - } - })) return fs } func (fs *fakeServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { fs.logf("%s %s", r.Method, r.RequestURI) - fs.mux.ServeHTTP(w, r) -} -func (fs *fakeServer) createSchema(w http.ResponseWriter, r *http.Request) { - // POST /subjects/{subject}/versions => returns ID - if r.Method != http.MethodPost { + var ( + id int + subject string + version int + ) + p := r.URL.Path + switch { + case fs.match(p, "/schemas/ids/+", &id) && r.Method == http.MethodGet: + fs.schemaByID(w, r, id) + case fs.match(p, "/schemas/ids/+/versions", &id) && r.Method == http.MethodGet: + fs.subjectVersionsByID(w, r, id) + case fs.match(p, "/subjects/+/versions", &subject) && r.Method == http.MethodPost: + fs.createSchema(w, r, subject) + case fs.match(p, "/subjects/+/versions/+", &subject, &version) && r.Method == http.MethodGet: + fs.schemaBySubjectVersion(w, r, subject, version) + case fs.match(p, "/config/+", &subject) && r.Method == http.MethodPut: + fs.updateConfig(w, r) + default: http.NotFound(w, r) - return } +} + +// match reports whether path matches the given pattern, which is a +// path with '+' wildcards wherever you want to use a parameter. Path +// parameters are assigned to the pointers in vars (len(vars) must be +// the number of wildcards), which must be of type *string or *int. +// Source: https://github.com/benhoyt/go-routing/blob/master/match/route.go +func (*fakeServer) match(path, pattern string, vars ...interface{}) bool { + for ; pattern != "" && path != ""; pattern = pattern[1:] { + switch pattern[0] { + case '+': + // '+' matches till next slash in path + slash := strings.IndexByte(path, '/') + if slash < 0 { + slash = len(path) + } + segment := path[:slash] + path = path[slash:] + switch p := vars[0].(type) { + case *string: + *p = segment + case *int: + n, err := strconv.Atoi(segment) + if err != nil || n < 0 { + return false + } + *p = n + default: + panic("vars must be *string or *int") + } + vars = vars[1:] + case path[0]: + // non-'+' pattern byte must match path byte + path = path[1:] + default: + return false + } + } + return path == "" && pattern == "" +} +func (fs *fakeServer) createSchema(w http.ResponseWriter, r *http.Request, subject string) { + // POST /subjects/{subject}/versions => returns ID defer r.Body.Close() var s sr.Schema err := json.NewDecoder(r.Body).Decode(&s) @@ -255,26 +286,13 @@ func (fs *fakeServer) createSchema(w http.ResponseWriter, r *http.Request) { return } - tokens := strings.Split(r.URL.EscapedPath(), "/") - ss := fs.fr.CreateSchema(tokens[2], s) + ss := fs.fr.CreateSchema(subject, s) fs.json(w, map[string]any{"id": ss.ID}) } -func (fs *fakeServer) schemaBySubjectVersion(w http.ResponseWriter, r *http.Request) { +func (fs *fakeServer) schemaBySubjectVersion(w http.ResponseWriter, _ *http.Request, subject string, version int) { // GET /subjects/{subject}/versions/{version} - if r.Method != http.MethodGet { - http.NotFound(w, r) - return - } - - tokens := strings.Split(r.URL.EscapedPath(), "/") - version, err := strconv.Atoi(tokens[4]) - if err != nil { - fs.error(w, http.StatusInternalServerError, cerrors.Errorf("invalid schema version: %w", err)) - return - } - - ss, ok := fs.fr.SchemaBySubjectVersion(tokens[2], version) + ss, ok := fs.fr.SchemaBySubjectVersion(subject, version) if !ok { fs.errorWithCode(w, http.StatusNotFound, errorCodeSubjectNotFound, cerrors.New("subject not found")) return @@ -282,20 +300,8 @@ func (fs *fakeServer) schemaBySubjectVersion(w http.ResponseWriter, r *http.Requ fs.json(w, ss) } -func (fs *fakeServer) schemaByID(w http.ResponseWriter, r *http.Request) { +func (fs *fakeServer) schemaByID(w http.ResponseWriter, _ *http.Request, id int) { // GET /schemas/ids/{id} - if r.Method != http.MethodGet { - http.NotFound(w, r) - return - } - - tokens := strings.Split(r.URL.EscapedPath(), "/") - id, err := strconv.Atoi(tokens[3]) - if err != nil { - fs.error(w, http.StatusInternalServerError, cerrors.Errorf("invalid schema ID: %w", err)) - return - } - s, ok := fs.fr.SchemaByID(id) if !ok { fs.errorWithCode(w, http.StatusNotFound, errorCodeSchemaNotFound, cerrors.New("schema not found")) @@ -304,22 +310,38 @@ func (fs *fakeServer) schemaByID(w http.ResponseWriter, r *http.Request) { fs.json(w, s) } -func (fs *fakeServer) subjectVersionsByID(w http.ResponseWriter, r *http.Request) { +func (fs *fakeServer) subjectVersionsByID(w http.ResponseWriter, _ *http.Request, id int) { // GET /schemas/ids/{id}/versions - if r.Method != http.MethodGet { - http.NotFound(w, r) - return - } + sss := fs.fr.SubjectVersionsByID(id) + fs.json(w, sss) +} - tokens := strings.Split(r.URL.EscapedPath(), "/") - id, err := strconv.Atoi(tokens[3]) +func (fs *fakeServer) updateConfig(w http.ResponseWriter, r *http.Request) { + // PUT /config/{subject} + defer r.Body.Close() + var c struct { + Compatibility string `json:"compatibility"` + } + err := json.NewDecoder(r.Body).Decode(&c) if err != nil { - fs.error(w, http.StatusInternalServerError, cerrors.Errorf("invalid schema ID: %w", err)) + fs.error(w, http.StatusInternalServerError, err) return } - sss := fs.fr.SubjectVersionsByID(id) - fs.json(w, sss) + valid := map[string]bool{ + "BACKWARD": true, + "BACKWARD_TRANSITIVE": true, + "FORWARD": true, + "FORWARD_TRANSITIVE": true, + "FULL": true, + "FULL_TRANSITIVE": true, + "NONE": true, + }[c.Compatibility] + if !valid { + fs.errorWithCode(w, 42203, http.StatusUnprocessableEntity, cerrors.New("invalid compatibility level")) + return + } + fs.json(w, c) } func (fs *fakeServer) json(w http.ResponseWriter, v any) { diff --git a/pkg/processor/schemaregistry/client_test.go b/pkg/processor/schemaregistry/client_test.go index 843813d72..b1bc8a7f9 100644 --- a/pkg/processor/schemaregistry/client_test.go +++ b/pkg/processor/schemaregistry/client_test.go @@ -190,25 +190,37 @@ func TestClient_CacheHit(t *testing.T) { }) is.NoErr(err) - is.Equal(len(rtr.Records()), 3) + is.Equal(len(rtr.Records()), 5) rtr.AssertRecord(is, 0, + assertMethod("GET"), + assertRequestURI("/subjects/test-cache-hit/versions?deleted=true"), + assertResponseStatus(404), + assertError(nil), + ) + rtr.AssertRecord(is, 1, assertMethod("POST"), assertRequestURI("/subjects/test-cache-hit/versions"), assertResponseStatus(200), assertError(nil), ) - rtr.AssertRecord(is, 1, + rtr.AssertRecord(is, 2, assertMethod("GET"), assertRequestURI(fmt.Sprintf("/schemas/ids/%d/versions", want.ID)), assertResponseStatus(200), assertError(nil), ) - rtr.AssertRecord(is, 2, + rtr.AssertRecord(is, 3, assertMethod("GET"), assertRequestURI("/subjects/test-cache-hit/versions/1"), assertResponseStatus(200), assertError(nil), ) + rtr.AssertRecord(is, 4, + assertMethod("PUT"), + assertRequestURI("/config/test-cache-hit?defaultToGlobal=true"), + assertResponseStatus(200), + assertError(nil), + ) rtr.Clear() // clear requests before subtests diff --git a/pkg/processor/schemaregistry/decoder.go b/pkg/processor/schemaregistry/decoder.go new file mode 100644 index 000000000..14dbd2165 --- /dev/null +++ b/pkg/processor/schemaregistry/decoder.go @@ -0,0 +1,90 @@ +// Copyright © 2023 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schemaregistry + +import ( + "context" + + "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/conduitio/conduit/pkg/record" + "github.com/lovromazgon/franz-go/pkg/sr" +) + +type Decoder struct { + client *Client + serde *sr.Serde + logger log.CtxLogger +} + +func NewDecoder(client *Client, logger log.CtxLogger, serde *sr.Serde) *Decoder { + return &Decoder{ + client: client, + serde: serde, + logger: logger.WithComponent("schemaregistry.Decoder"), + } +} + +func (d *Decoder) Decode(ctx context.Context, b record.RawData) (record.StructuredData, error) { + var out record.StructuredData + err := d.serde.Decode(b.Raw, &out) + if cerrors.Is(err, sr.ErrNotRegistered) { + err = d.findAndRegisterSchema(ctx, b) + if err != nil { + return nil, err + } + // retry decoding + err = d.serde.Decode(b.Raw, &out) + } + if err != nil { + return nil, cerrors.Errorf("failed to decode raw data: %w", err) + } + + return out, nil +} + +func (d *Decoder) findAndRegisterSchema(ctx context.Context, b record.RawData) error { + id, _, _ := d.serde.Header().DecodeID(b.Raw) // we know this won't throw an error since Decode didn't return ErrBadHeader + s, err := d.client.SchemaByID(ctx, id) + if err != nil { + return cerrors.Errorf("failed to get schema: %w", err) + } + sf, ok := DefaultSchemaFactories[s.Type] + if !ok { + return cerrors.Errorf("unknown schema type %q (%d)", s.Type.String(), s.Type) + } + schema, err := sf.Parse(s.Schema) + if err != nil { + return cerrors.Errorf("failed to parse schema: %w", err) + } + + d.serde.Register( + id, + record.StructuredData{}, + sr.EncodeFn(encodeFn(schema, sr.SubjectSchema{ID: id})), + sr.DecodeFn(decodeFn(schema, sr.SubjectSchema{ID: id})), + ) + return nil +} + +func decodeFn(schema Schema, ss sr.SubjectSchema) func(b []byte, a any) error { + return func(b []byte, a any) error { + err := schema.Unmarshal(b, a) + if err != nil { + return cerrors.Errorf("failed to unmarshal data with schema (ID: %v, subject: %v, version: %v): %w", ss.ID, ss.Subject, ss.Version, err) + } + return nil + } +} diff --git a/pkg/processor/schemaregistry/encoder.go b/pkg/processor/schemaregistry/encoder.go new file mode 100644 index 000000000..9361699e5 --- /dev/null +++ b/pkg/processor/schemaregistry/encoder.go @@ -0,0 +1,133 @@ +// Copyright © 2023 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schemaregistry + +import ( + "context" + + "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/conduitio/conduit/pkg/record" + "github.com/lovromazgon/franz-go/pkg/sr" +) + +type Encoder struct { + client *Client + serde *sr.Serde + logger log.CtxLogger + + SchemaStrategy +} + +type SchemaStrategy interface { + GetSchema(context.Context, *Client, log.CtxLogger, record.StructuredData) (Schema, sr.SubjectSchema, error) +} + +func NewEncoder(client *Client, logger log.CtxLogger, serde *sr.Serde, strategy SchemaStrategy) *Encoder { + return &Encoder{ + client: client, + serde: serde, + logger: logger.WithComponent("schemaregistry.Encoder"), + SchemaStrategy: strategy, + } +} + +func (e *Encoder) Encode(ctx context.Context, sd record.StructuredData) (record.RawData, error) { + s, ss, err := e.GetSchema(ctx, e.client, e.logger, sd) + if err != nil { + return record.RawData{}, cerrors.Errorf("failed to get schema: %w", err) + } + + b, err := e.serde.Encode(sd, sr.ID(ss.ID)) + if cerrors.Is(err, sr.ErrNotRegistered) { + // TODO note that we need to register specific indexes when adding support for protobuf + e.serde.Register( + ss.ID, + record.StructuredData{}, + sr.EncodeFn(encodeFn(s, ss)), + sr.DecodeFn(decodeFn(s, ss)), + ) + + // try to encode again + b, err = e.serde.Encode(sd, sr.ID(ss.ID)) + } + if err != nil { + return record.RawData{}, cerrors.Errorf("failed to encode data: %w", err) + } + return record.RawData{Raw: b}, nil +} + +type ExtractAndUploadSchemaStrategy struct { + Type sr.SchemaType + Subject string +} + +func (str ExtractAndUploadSchemaStrategy) GetSchema(ctx context.Context, client *Client, _ log.CtxLogger, sd record.StructuredData) (Schema, sr.SubjectSchema, error) { + sf, ok := DefaultSchemaFactories[str.Type] + if !ok { + return nil, sr.SubjectSchema{}, cerrors.Errorf("unknown schema type %q (%d)", str.Type.String(), str.Type) + } + + s, err := sf.SchemaForType(sd) + if err != nil { + return nil, sr.SubjectSchema{}, cerrors.Errorf("could not extract avro schema: %w", err) + } + + ss, err := client.CreateSchema(ctx, str.Subject, sr.Schema{ + Schema: s.String(), + Type: str.Type, + References: nil, + }) + if err != nil { + return nil, sr.SubjectSchema{}, cerrors.Errorf("could not create schema: %w", err) + } + + return s, ss, nil +} + +type DownloadSchemaStrategy struct { + Subject string + // TODO add support for specifying "latest" - https://github.com/ConduitIO/conduit/issues/1095 + Version int +} + +func (str DownloadSchemaStrategy) GetSchema(ctx context.Context, client *Client, _ log.CtxLogger, _ record.StructuredData) (Schema, sr.SubjectSchema, error) { + // fetch schema from registry + ss, err := client.SchemaBySubjectVersion(ctx, str.Subject, str.Version) + if err != nil { + return nil, sr.SubjectSchema{}, cerrors.Errorf("could not fetch schema with subject %q and version %q: %w", str.Subject, str.Version, err) + } + + sf, ok := DefaultSchemaFactories[ss.Type] + if !ok { + return nil, sr.SubjectSchema{}, cerrors.Errorf("unknown schema type %q (%d)", ss.Type.String(), ss.Type) + } + + s, err := sf.Parse(ss.Schema.Schema) + if err != nil { + return nil, sr.SubjectSchema{}, err + } + return s, ss, nil +} + +func encodeFn(schema Schema, ss sr.SubjectSchema) func(v any) ([]byte, error) { + return func(v any) ([]byte, error) { + b, err := schema.Marshal(v) + if err != nil { + return nil, cerrors.Errorf("failed to marshal data with schema (ID: %v, subject: %v, version: %v): %w", ss.ID, ss.Subject, ss.Version, err) + } + return b, nil + } +} diff --git a/pkg/processor/schemaregistry/encoder_test.go b/pkg/processor/schemaregistry/encoder_test.go new file mode 100644 index 000000000..9200a0d19 --- /dev/null +++ b/pkg/processor/schemaregistry/encoder_test.go @@ -0,0 +1,129 @@ +// Copyright © 2023 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schemaregistry + +import ( + "context" + "testing" + + "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/conduitio/conduit/pkg/record" + "github.com/lovromazgon/franz-go/pkg/sr" + "github.com/matryer/is" +) + +func TestEncodeDecode_ExtractAndUploadSchemaStrategy(t *testing.T) { + is := is.New(t) + ctx := context.Background() + logger := log.Nop() + + var serde sr.Serde + client, err := NewClient(logger, sr.URLs(testSchemaRegistryURL(t))) + is.NoErr(err) + + have := record.StructuredData{ + "myString": "bar", + "myInt": 1, + "myFloat": 2.3, + "myMap": map[string]any{ + "foo": true, + "bar": 2.2, + }, + "myStruct": record.StructuredData{ + "foo": 1, + "bar": false, + }, + "mySlice": []int{1, 2, 3}, + } + want := record.StructuredData{ + "myString": "bar", + "myInt": 1, + "myFloat": 2.3, + "myMap": map[string]any{ + "foo": true, + "bar": 2.2, + }, + "myStruct": map[string]any{ // records are unmarshaled into a map + "foo": 1, + "bar": false, + }, + "mySlice": []any{1, 2, 3}, // slice without type + } + + for schemaType := range DefaultSchemaFactories { + t.Run(schemaType.String(), func(t *testing.T) { + is := is.New(t) + enc := NewEncoder(client, logger, &serde, ExtractAndUploadSchemaStrategy{ + Type: schemaType, + Subject: "test1" + schemaType.String(), + }) + dec := NewDecoder(client, logger, &serde) + + bytes, err := enc.Encode(ctx, have) + is.NoErr(err) + + got, err := dec.Decode(ctx, bytes) + is.NoErr(err) + + is.Equal(want, got) + }) + } +} + +func TestEncodeDecode_DownloadStrategy_Avro(t *testing.T) { + is := is.New(t) + ctx := context.Background() + logger := log.Nop() + + var serde sr.Serde + client, err := NewClient(logger, sr.URLs(testSchemaRegistryURL(t))) + is.NoErr(err) + + have := record.StructuredData{ + "myString": "bar", + "myInt": 1, + } + want := record.StructuredData{ + "myString": "bar", + "myInt": 1, + } + ss, err := client.CreateSchema(ctx, "test2", sr.Schema{ + Type: sr.TypeAvro, + Schema: ` +{ + "type":"record", + "name":"record", + "fields":[ + {"name":"myString","type":"string"}, + {"name":"myInt","type":"int"} + ] +}`, + }) + is.NoErr(err) + + enc := NewEncoder(client, logger, &serde, DownloadSchemaStrategy{ + Subject: ss.Subject, + Version: ss.Version, + }) + dec := NewDecoder(client, logger, &serde) + + bytes, err := enc.Encode(ctx, have) + is.NoErr(err) + + got, err := dec.Decode(ctx, bytes) + is.NoErr(err) + + is.Equal(want, got) +} diff --git a/pkg/processor/schemaregistry/schema.go b/pkg/processor/schemaregistry/schema.go new file mode 100644 index 000000000..69b276d86 --- /dev/null +++ b/pkg/processor/schemaregistry/schema.go @@ -0,0 +1,45 @@ +// Copyright © 2023 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schemaregistry + +import ( + "github.com/conduitio/conduit/pkg/processor/schemaregistry/avro" + "github.com/lovromazgon/franz-go/pkg/sr" +) + +type Schema interface { + // Marshal returns the encoded representation of v. + Marshal(v any) ([]byte, error) + // Unmarshal parses encoded data and stores the result in the value pointed + // to by v. If v is nil or not a pointer, Unmarshal returns an error. + Unmarshal(b []byte, v any) error + // String returns the textual representation of the schema. + String() string +} + +type SchemaFactory struct { + // Parse takes the textual representation of the schema and parses it into + // a Schema. + Parse func(string) (Schema, error) + // SchemaForType returns a Schema that matches the structure of v. + SchemaForType func(v any) (Schema, error) +} + +var DefaultSchemaFactories = map[sr.SchemaType]SchemaFactory{ + avro.Type: { + Parse: func(s string) (Schema, error) { return avro.Parse(s) }, + SchemaForType: func(v any) (Schema, error) { return avro.SchemaForType(v) }, + }, +}