Skip to content

Commit

Permalink
Adapter to support multiple schema registries
Browse files Browse the repository at this point in the history
  • Loading branch information
alok87 committed Dec 17, 2020
1 parent 9e466db commit 28c0e12
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 71 deletions.
16 changes: 8 additions & 8 deletions pkg/producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@ import (
"github.com/Shopify/sarama"
"github.com/linkedin/goavro/v2"
"github.com/practo/klog/v2"
"github.com/practo/tipoca-stream/redshiftsink/pkg/serializer"
"github.com/riferrei/srclient"
"github.com/practo/tipoca-stream/redshiftsink/pkg/schemaregistry"
"strings"
"time"
)

type AvroProducer struct {
producer sarama.SyncProducer
srclient *srclient.SchemaRegistryClient
registry schemaregistry.SchemaRegistry
}

func NewAvroProducer(brokers []string,
Expand Down Expand Up @@ -42,7 +41,7 @@ func NewAvroProducer(brokers []string,

return &AvroProducer{
producer: producer,
srclient: srclient.CreateSchemaRegistryClient(schemaRegistryURL),
registry: schemaregistry.NewRegistry(schemaRegistryURL),
}, nil
}

Expand All @@ -55,12 +54,13 @@ func (c *AvroProducer) CreateSchema(
schemeStr := strings.ReplaceAll(scheme, "\n", "")
schemeStr = strings.ReplaceAll(schemeStr, " ", "")

schema, err := serializer.GetLatestSchemaWithRetry(
c.srclient, topic, false, 10)
schema, err := schemaregistry.GetLatestSchemaWithRetry(
c.registry, topic, false, 10,
)
if schema == nil || schema.Schema() != schemeStr {
klog.V(2).Infof("Creating schema version. topic: %s", topic)
schema, err = c.srclient.CreateSchema(
topic, scheme, srclient.Avro, false,
schema, err = c.registry.CreateSchema(
topic, scheme, schemaregistry.Avro, false,
)
if err != nil {
return 0, false, err
Expand Down
159 changes: 159 additions & 0 deletions pkg/schemaregistry/schemaregistry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package schemaregistry

import (
"fmt"
"github.com/linkedin/goavro/v2"
"github.com/practo/klog/v2"
"github.com/riferrei/srclient"
"math/rand"
"time"
)

// SchemaRegistry supports multiple schema registries.
// An adapter interface to support various schema registries out there!
// at present it supports only one(cSchemaRegistry)
type SchemaRegistry interface {
GetSchema(schemaID int) (*Schema, error)
GetLatestSchema(subject string, key bool) (*Schema, error)
CreateSchema(subject string, scheme string, schemaType SchemaType, key bool) (*Schema, error)
}

type Schema struct {
id int
schema string
version int
codec *goavro.Codec
}

func (schema *Schema) ID() int {
return schema.id
}

func (schema *Schema) Schema() string {
return schema.schema
}

func (schema *Schema) Version() int {
return schema.version
}

func (schema *Schema) Codec() *goavro.Codec {
return schema.codec
}

type SchemaType string

const (
Avro SchemaType = "AVRO"
)

func NewRegistry(url string) SchemaRegistry {
return &cSchemaRegistry{
client: srclient.CreateSchemaRegistryClient(url),
}
}

type cSchemaRegistry struct {
client *srclient.SchemaRegistryClient
}

func (c *cSchemaRegistry) GetSchema(schemaID int) (*Schema, error) {
cSchema, err := c.client.GetSchema(schemaID)
if err != nil {
return nil, err
}

return toSchema(cSchema), nil
}

func (c *cSchemaRegistry) GetLatestSchema(
subject string, key bool) (*Schema, error) {
cSchema, err := c.client.GetLatestSchema(subject, key)
if err != nil {
return nil, err
}

return toSchema(cSchema), nil
}

func (c *cSchemaRegistry) CreateSchema(
subject string, schema string,
schemaType SchemaType, key bool) (*Schema, error) {

cSchema, err := c.client.CreateSchema(
subject, schema, tocSchemaType(schemaType), key)
if err != nil {
return nil, err
}

return toSchema(cSchema), nil
}

func toSchema(cSchema *srclient.Schema) *Schema {
return &Schema{
id: cSchema.ID(),
schema: cSchema.Schema(),
version: cSchema.Version(),
codec: cSchema.Codec(),
}
}

func tocSchemaType(schemaType SchemaType) srclient.SchemaType {
switch schemaType {
case Avro:
return srclient.Avro
}

return ""
}

func GetSchemaWithRetry(
registry SchemaRegistry,
schemaId int,
attempts int,
) (
*Schema,
error,
) {
for i := 0; ; i++ {
schema, err := registry.GetSchema(schemaId)
if err == nil {
return schema, nil
}
if i >= (attempts - 1) {
return nil, fmt.Errorf(
"Failed to get schema by id: %d, err:%v\n", schemaId, err)
}
klog.Warningf(
"Retrying. Error fetching schema by id: %d err:%v\n",
schemaId, err)
sleepFor := rand.Intn(30-2+1) + 2
time.Sleep(time.Duration(sleepFor) * time.Second)
}
}

func GetLatestSchemaWithRetry(
registry SchemaRegistry,
topic string,
key bool,
attempts int,
) (
*Schema,
error,
) {
for i := 0; ; i++ {
schema, err := registry.GetLatestSchema(topic, key)
if err == nil {
return schema, nil
}
if i >= (attempts - 1) {
return nil, fmt.Errorf(
"Failed to get latest schema, topic: %s, err:%v\n", topic, err)
}
klog.Warningf(
"Retrying. Error getting latest schema, topic:%s, err:%v\n",
topic, err)
sleepFor := rand.Intn(30-2+1) + 2
time.Sleep(time.Duration(sleepFor) * time.Second)
}
}
53 changes: 0 additions & 53 deletions pkg/serializer/schema.go

This file was deleted.

12 changes: 8 additions & 4 deletions pkg/serializer/serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"encoding/binary"
"fmt"
"github.com/Shopify/sarama"
"github.com/riferrei/srclient"
"github.com/practo/tipoca-stream/redshiftsink/pkg/schemaregistry"
)

const (
Expand Down Expand Up @@ -40,19 +40,23 @@ type Serializer interface {

func NewSerializer(schemaRegistryURL string) Serializer {
return &avroSerializer{
srclient: srclient.CreateSchemaRegistryClient(schemaRegistryURL),
registry: schemaregistry.NewRegistry(schemaRegistryURL),
}
}

type avroSerializer struct {
srclient *srclient.SchemaRegistryClient
registry schemaregistry.SchemaRegistry
}

func (c *avroSerializer) Deserialize(
message *sarama.ConsumerMessage) (*Message, error) {

schemaId := binary.BigEndian.Uint32(message.Value[1:5])
schema, err := GetSchemaWithRetry(c.srclient, int(schemaId), 10)
schema, err := schemaregistry.GetSchemaWithRetry(
c.registry,
int(schemaId),
10,
)
if err != nil {
return nil, err
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/transformer/debezium/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (
"fmt"
"github.com/practo/klog/v2"
"github.com/practo/tipoca-stream/redshiftsink/pkg/redshift"
"github.com/practo/tipoca-stream/redshiftsink/pkg/schemaregistry"
"github.com/practo/tipoca-stream/redshiftsink/pkg/serializer"
"github.com/practo/tipoca-stream/redshiftsink/pkg/transformer"
"github.com/practo/tipoca-stream/redshiftsink/pkg/transformer/masker"
"github.com/riferrei/srclient"
"strings"
)

Expand Down Expand Up @@ -44,7 +44,7 @@ type SourceType struct {
func NewSchemaTransformer(url string) transformer.SchemaTransformer {
return &schemaTransformer{
maskConfig: make(map[int]masker.MaskConfig),
srclient: srclient.CreateSchemaRegistryClient(url),
registry: schemaregistry.NewRegistry(url),
}
}

Expand Down Expand Up @@ -231,11 +231,11 @@ func (d *schemaParser) columnsBefore() []ColInfo {
type schemaTransformer struct {
mask bool
maskConfig map[int]masker.MaskConfig
srclient *srclient.SchemaRegistryClient
registry schemaregistry.SchemaRegistry
}

func (c *schemaTransformer) TransformKey(topic string) ([]string, error) {
s, err := serializer.GetLatestSchemaWithRetry(c.srclient, topic, true, 10)
s, err := schemaregistry.GetLatestSchemaWithRetry(c.registry, topic, true, 10)
if err != nil {
return []string{}, err
}
Expand Down Expand Up @@ -292,7 +292,7 @@ func isPrimaryKey(columnName string, primaryKeys []string) bool {
func (c *schemaTransformer) TransformValue(topic string, schemaId int,
maskSchema map[string]serializer.MaskInfo) (interface{}, error) {

s, err := serializer.GetSchemaWithRetry(c.srclient, schemaId, 10)
s, err := schemaregistry.GetSchemaWithRetry(c.registry, schemaId, 10)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/transformer/debezium/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestSchemaMysqlDataType(t *testing.T) {
for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
c := &schemaTransformer{srclient: nil}
c := &schemaTransformer{registry: nil}

resp, err := c.transformSchemaValue(
tc.jobSchema, []string{"id"}, tc.maskSchema)
Expand Down

0 comments on commit 28c0e12

Please sign in to comment.