diff --git a/avro_encoder_decoder.go b/avro_encoder_decoder.go index 007c186..eb8ceb2 100644 --- a/avro_encoder_decoder.go +++ b/avro_encoder_decoder.go @@ -20,6 +20,7 @@ import ( "encoding/binary" "errors" "fmt" + avro "github.com/elodina/go-avro" ) @@ -42,11 +43,7 @@ type KafkaAvroEncoder struct { schemaRegistry SchemaRegistryClient } -func NewKafkaAvroEncoder(url string) *KafkaAvroEncoder { - return NewKafkaAvroEncoderAuth(url, nil) -} - -func NewKafkaAvroEncoderAuth(url string, auth *KafkaAvroAuth) *KafkaAvroEncoder { +func CreatePrimitiveSchemas() map[string]avro.Schema { primitiveSchemas := make(map[string]avro.Schema) primitiveSchemas["Null"] = createPrimitiveSchema("null") primitiveSchemas["Boolean"] = createPrimitiveSchema("boolean") @@ -56,10 +53,21 @@ func NewKafkaAvroEncoderAuth(url string, auth *KafkaAvroAuth) *KafkaAvroEncoder primitiveSchemas["Double"] = createPrimitiveSchema("double") primitiveSchemas["String"] = createPrimitiveSchema("string") primitiveSchemas["Bytes"] = createPrimitiveSchema("bytes") + return primitiveSchemas +} + +func NewKafkaAvroEncoder(url string) *KafkaAvroEncoder { + return NewKafkaAvroEncoderAuth(url, nil) +} + +func NewKafkaAvroEncoderAuth(url string, auth *KafkaAvroAuth) *KafkaAvroEncoder { + return NewKafkaAvroEncoderWithSchemaRegistryClient(NewCachedSchemaRegistryClientAuth(url, auth)) +} +func NewKafkaAvroEncoderWithSchemaRegistryClient(client SchemaRegistryClient) *KafkaAvroEncoder { return &KafkaAvroEncoder{ - schemaRegistry: NewCachedSchemaRegistryClientAuth(url, auth), - primitiveSchemas: primitiveSchemas, + schemaRegistry: client, + primitiveSchemas: CreatePrimitiveSchemas(), } } @@ -153,6 +161,12 @@ func NewKafkaAvroDecoderAuth(url string, auth *KafkaAvroAuth) *KafkaAvroDecoder } } +func NewKafkaAvroDecoderWithSchemaRegistryClient(client SchemaRegistryClient) *KafkaAvroDecoder { + return &KafkaAvroDecoder{ + schemaRegistry: client, + } +} + func (this *KafkaAvroDecoder) Decode(bytes []byte) (interface{}, error) { if bytes == nil { return nil, nil