From 16fafdb022b89522a98195e0257afb5e5c890da9 Mon Sep 17 00:00:00 2001 From: Kirmanie L Ravariere Date: Tue, 14 Jun 2022 01:20:26 -0700 Subject: [PATCH 1/2] Add support for using different subject name strategies --- avro.go | 28 ++- avro_test.go | 235 +++++++++++++++++- configuration.go | 11 +- jsonschema.go | 2 +- jsonschema_test.go | 4 +- schema_registry.go | 43 +++- schema_registry_test.go | 83 ++++++- scripts/helpers/schema_registry.js | 6 + ...st_avro_named_strategy_and_magic_prefix.js | 101 ++++++++ 9 files changed, 488 insertions(+), 25 deletions(-) create mode 100644 scripts/helpers/schema_registry.js create mode 100644 scripts/test_avro_named_strategy_and_magic_prefix.js diff --git a/avro.go b/avro.go index 907eec3..5006974 100644 --- a/avro.go +++ b/avro.go @@ -20,7 +20,12 @@ func SerializeAvro(configuration Configuration, topic string, data interface{}, bytesData := []byte(data.(string)) client := SchemaRegistryClientWithConfiguration(configuration.SchemaRegistry) - subject := topic + "-" + string(element) + + var subject, subjectNameError = GetSubjectName(schema, topic, element, configuration.Producer.SubjectNameStrategy) + if subjectNameError != nil { + return nil, subjectNameError + } + var schemaInfo *srclient.Schema schemaID := 0 @@ -86,22 +91,35 @@ func SerializeAvro(configuration Configuration, topic string, data interface{}, // is used to configure the Schema Registry client. The element is used to define the subject. // The data should be a byte array. func DeserializeAvro(configuration Configuration, topic string, data []byte, element Element, schema string, version int) (interface{}, *Xk6KafkaError) { - bytesDecodedData, err := DecodeWireFormat(data) + schemaID, bytesDecodedData, err := DecodeWireFormat(data) if err != nil { return nil, NewXk6KafkaError(failedDecodeFromWireFormat, "Failed to remove wire format from the binary data", err) } - client := SchemaRegistryClientWithConfiguration(configuration.SchemaRegistry) - subject := topic + "-" + string(element) var schemaInfo *srclient.Schema - var xk6KafkaError *Xk6KafkaError + client := SchemaRegistryClientWithConfiguration(configuration.SchemaRegistry) + + var subject, subjectNameError = GetSubjectName(schema, topic, element, configuration.Consumer.SubjectNameStrategy) + if subjectNameError != nil { + return nil, subjectNameError + } + if schema != "" { // Schema is provided, so we need to create it and get the schema ID schemaInfo, xk6KafkaError = CreateSchema(client, subject, schema, srclient.Avro) + } else if configuration.Consumer.UseMagicPrefix { + // Schema not provided and no valid version flag, so we use te schemaID in the magic prefix + schemaResult, err := client.GetSchema(schemaID) + schemaInfo = schemaResult + if err != nil { + xk6KafkaError = NewXk6KafkaError(failedCreateAvroCodec, + "Failed to get schema by magic prefix", + err) + } } else { // Schema is not provided, so we need to fetch the schema from the Schema Registry schemaInfo, xk6KafkaError = GetSchema(client, subject, schema, srclient.Avro, version) diff --git a/avro_test.go b/avro_test.go index 4748749..6ffefa4 100644 --- a/avro_test.go +++ b/avro_test.go @@ -1,6 +1,7 @@ package kafka import ( + "github.com/riferrei/srclient" "testing" "github.com/stretchr/testify/assert" @@ -51,7 +52,7 @@ func TestSerializeDeserializeAvroFailsOnSchemaError(t *testing.T) { assert.Equal(t, failedCreateAvroCodec, err.Code) // Deserialize the key or value - deserialized, err := DeserializeAvro(avroConfig, "topic", []byte{1, 2, 3, 4, 5, 6}, element, jsonSchema, 0) + deserialized, err := DeserializeAvro(avroConfig, "topic", []byte{0, 1, 2, 3, 4, 5}, element, jsonSchema, 0) assert.Nil(t, deserialized) assert.Error(t, err.Unwrap()) assert.Equal(t, "Failed to create codec for decoding Avro", err.Message) @@ -73,7 +74,7 @@ func TestSerializeDeserializeAvroFailsOnWireFormatError(t *testing.T) { // Deserialize a broken key or value // Proper wire-formatted message has 5 bytes (the wire format) plus data - deserialized, err = DeserializeAvro(avroConfig, "topic", []byte{1, 2, 3, 4}, element, schema, 0) + deserialized, err = DeserializeAvro(avroConfig, "topic", []byte{0, 1, 2, 3}, element, schema, 0) assert.Nil(t, deserialized) assert.Error(t, err.Unwrap()) assert.Equal(t, "Failed to remove wire format from the binary data", err.Message) @@ -92,10 +93,238 @@ func TestSerializeDeserializeAvroFailsOnEncodeDecodeError(t *testing.T) { assert.Equal(t, "Failed to encode data into Avro", err.Message) assert.Equal(t, failedEncodeToAvro, err.Code) - deserialized, err := DeserializeAvro(avroConfig, "topic", []byte{1, 2, 3, 4, 5, 6}, element, avroSchema, 0) + deserialized, err := DeserializeAvro(avroConfig, "topic", []byte{0, 1, 2, 3, 5}, element, avroSchema, 0) assert.Nil(t, deserialized) assert.Error(t, err.Unwrap()) assert.Equal(t, "Failed to decode data from Avro", err.Message) assert.Equal(t, failedDecodeAvroFromBinary, err.Code) } } + +func TestAvroSerializeTopicNameStrategy(t *testing.T) { + data := `{"field":"value"}` + topic := "TestAvroSerializeTopicNameStrategy-topic" + config := Configuration{ + Producer: ProducerConfiguration{ + ValueSerializer: AvroSerializer, + SubjectNameStrategy: "TopicNameStrategy", + }, + SchemaRegistry: SchemaRegistryConfiguration{ + Url: "http://localhost:8081", + }, + } + + schema := `{"type":"record","name":"TestAvroSerializeTopicNameStrategyIsDefaultStrategy","namespace":"io.confluent.kafka.avro","fields":[{"name":"field","type":"string"}]}` + + serialized, err := SerializeAvro(config, topic, data, Value, schema, 0) + assert.Nil(t, err) + assert.NotNil(t, serialized) + + expectedSubject := topic + "-value" + srClient := SchemaRegistryClientWithConfiguration(config.SchemaRegistry) + schemaResult, err := GetSchema(srClient, expectedSubject, schema, srclient.Avro, 0) + assert.Nil(t, err) + assert.NotNil(t, schemaResult) +} + +func TestAvroSerializeTopicNameStrategyIsDefaultStrategy(t *testing.T) { + data := `{"field":"value"}` + topic := "TestAvroSerializeTopicNameStrategyIsDefaultStrategy-topic" + config := Configuration{ + Producer: ProducerConfiguration{ + ValueSerializer: AvroSerializer, + }, + SchemaRegistry: SchemaRegistryConfiguration{ + Url: "http://localhost:8081", + }, + } + + schema := `{"type":"record","name":"TestAvroSerializeTopicNameStrategyIsDefaultStrategy","namespace":"io.confluent.kafka.avro","fields":[{"name":"field","type":"string"}]}` + + serialized, err := SerializeAvro(config, topic, data, Value, schema, 0) + assert.Nil(t, err) + assert.NotNil(t, serialized) + + expectedSubject := topic + "-value" + srClient := SchemaRegistryClientWithConfiguration(config.SchemaRegistry) + schemaResult, err := GetSchema(srClient, expectedSubject, schema, srclient.Avro, 0) + assert.Nil(t, err) + assert.NotNil(t, schemaResult) +} + +func TestAvroSerializeTopicRecordNameStrategy(t *testing.T) { + data := `{"field":"value"}` + topic := "TestAvroSerializeTopicRecordNameStrategy-topic" + config := Configuration{ + Producer: ProducerConfiguration{ + ValueSerializer: AvroSerializer, + SubjectNameStrategy: "TopicRecordNameStrategy", + }, + SchemaRegistry: SchemaRegistryConfiguration{ + Url: "http://localhost:8081", + }, + } + schema := `{"type":"record","name":"TestAvroSerializeTopicRecordNameStrategy","namespace":"io.confluent.kafka.avro","fields":[{"name":"field","type":"string"}]}` + + serialized, err := SerializeAvro(config, topic, data, Value, schema, 0) + assert.Nil(t, err) + assert.NotNil(t, serialized) + + expectedSubject := topic + "-io.confluent.kafka.avro.TestAvroSerializeTopicRecordNameStrategy" + srClient := SchemaRegistryClientWithConfiguration(config.SchemaRegistry) + schemaResult, err := GetSchema(srClient, expectedSubject, schema, srclient.Avro, 0) + assert.Nil(t, err) + assert.NotNil(t, schemaResult) +} + +func TestAvroSerializeRecordNameStrategy(t *testing.T) { + data := `{"field":"value"}` + topic := "TestAvroSerializeRecordNameStrategy-topic" + config := Configuration{ + Producer: ProducerConfiguration{ + ValueSerializer: AvroSerializer, + SubjectNameStrategy: "RecordNameStrategy", + }, + SchemaRegistry: SchemaRegistryConfiguration{ + Url: "http://localhost:8081", + }, + } + schema := `{"type":"record","name":"TestAvroSerializeRecordNameStrategy","namespace":"io.confluent.kafka.avro","fields":[{"name":"field","type":"string"}]}` + + serialized, err := SerializeAvro(config, topic, data, Value, schema, 0) + assert.Nil(t, err) + assert.NotNil(t, serialized) + + expectedSubject := "io.confluent.kafka.avro.TestAvroSerializeRecordNameStrategy" + srClient := SchemaRegistryClientWithConfiguration(config.SchemaRegistry) + resultSchema, err := GetSchema(srClient, expectedSubject, avroSchema, srclient.Avro, 0) + assert.Nil(t, err) + assert.NotNil(t, resultSchema) +} + +func TestAvroDeserializeUsingMagicPrefix(t *testing.T) { + data := `{"field":"value"}` + topic := "TestAvroDeserializeUsingMagicPrefix-topic" + config := Configuration{ + Consumer: ConsumerConfiguration{ + UseMagicPrefix: true, + }, + Producer: ProducerConfiguration{ + ValueSerializer: AvroSerializer, + SubjectNameStrategy: "RecordNameStrategy", + }, + SchemaRegistry: SchemaRegistryConfiguration{ + Url: "http://localhost:8081", + }, + } + schema := `{"type":"record","name":"TestAvroDeserializeUsingMagicPrefix","namespace":"io.confluent.kafka.avro","fields":[{"name":"field","type":"string"}]}` + + serialized, err := SerializeAvro(config, topic, data, Value, schema, 0) + assert.Nil(t, err) + + dData, dErr := DeserializeAvro(config, topic, serialized, Value, "", 0) + assert.Equal(t, "value", dData.(map[string]interface{})["field"]) + assert.Nil(t, dErr) +} + +func TestAvroDeserializeUsingDefaultSubjectNameStrategy(t *testing.T) { + data := `{"field":"value"}` + topic := "TestAvroDeserializeUsingDefaultSubjectNameStrategy-topic" + config := Configuration{ + Producer: ProducerConfiguration{ + ValueSerializer: AvroSerializer, + }, + Consumer: ConsumerConfiguration{ + ValueDeserializer: AvroSerializer, + }, + SchemaRegistry: SchemaRegistryConfiguration{ + Url: "http://localhost:8081", + }, + } + schema := `{"type":"record","name":"TestAvroDeserializeUsingDefaultSubjectNameStrategy","namespace":"io.confluent.kafka.avro","fields":[{"name":"field","type":"string"}]}` + + serialized, err := SerializeAvro(config, topic, data, Value, schema, 0) + assert.Nil(t, err) + + dData, dErr := DeserializeAvro(config, topic, serialized, Value, "", 0) + assert.Equal(t, "value", dData.(map[string]interface{})["field"]) + assert.Nil(t, dErr) +} + +func TestAvroDeserializeUsingSubjectNameStrategyRecordName(t *testing.T) { + data := `{"field":"value"}` + topic := "TestAvroDeserializeUsingSubjectNameStrategyRecordName-topic" + config := Configuration{ + Producer: ProducerConfiguration{ + ValueSerializer: AvroSerializer, + SubjectNameStrategy: "RecordNameStrategy", + }, + Consumer: ConsumerConfiguration{ + ValueDeserializer: AvroSerializer, + SubjectNameStrategy: "RecordNameStrategy", + }, + SchemaRegistry: SchemaRegistryConfiguration{ + Url: "http://localhost:8081", + }, + } + schema := `{"type":"record","name":"TestAvroDeserializeUsingSubjectNameStrategyRecordName","namespace":"io.confluent.kafka.avro","fields":[{"name":"field","type":"string"}]}` + + serialized, err := SerializeAvro(config, topic, data, Value, schema, 0) + assert.Nil(t, err) + + dData, dErr := DeserializeAvro(config, topic, serialized, Value, schema, 0) + assert.Equal(t, "value", dData.(map[string]interface{})["field"]) + assert.Nil(t, dErr) +} + +func TestAvroDeserializeUsingSubjectNameStrategyTopicRecordName(t *testing.T) { + data := `{"field":"value"}` + topic := "TestAvroDeserializeUsingSubjectNameStrategyTopicRecordName-topic" + config := Configuration{ + Producer: ProducerConfiguration{ + ValueSerializer: AvroSerializer, + SubjectNameStrategy: "TopicRecordNameStrategy", + }, + Consumer: ConsumerConfiguration{ + ValueDeserializer: AvroSerializer, + SubjectNameStrategy: "TopicRecordNameStrategy", + }, + SchemaRegistry: SchemaRegistryConfiguration{ + Url: "http://localhost:8081", + }, + } + schema := `{"type":"record","name":"TestAvroDeserializeUsingSubjectNameStrategyTopicRecordName","namespace":"io.confluent.kafka.avro","fields":[{"name":"field","type":"string"}]}` + + serialized, err := SerializeAvro(config, topic, data, Value, schema, 0) + assert.Nil(t, err) + + dData, dErr := DeserializeAvro(config, topic, serialized, Value, schema, 0) + assert.Equal(t, "value", dData.(map[string]interface{})["field"]) + assert.Nil(t, dErr) +} + +func TestAvroDeserializeUsingSubjectNameStrategyTopicName(t *testing.T) { + data := `{"field":"value"}` + topic := "TestAvroDeserializeUsingSubjectNameStrategyTopicName-topic" + config := Configuration{ + Producer: ProducerConfiguration{ + ValueSerializer: AvroSerializer, + SubjectNameStrategy: "TopicNameStrategy", + }, + Consumer: ConsumerConfiguration{ + ValueDeserializer: AvroSerializer, + SubjectNameStrategy: "TopicNameStrategy", + }, + SchemaRegistry: SchemaRegistryConfiguration{ + Url: "http://localhost:8081", + }, + } + schema := `{"type":"record","name":"TestAvroDeserializeUsingSubjectNameStrategyTopicName","namespace":"io.confluent.kafka.avro","fields":[{"name":"field","type":"string"}]}` + + serialized, err := SerializeAvro(config, topic, data, Value, schema, 0) + assert.Nil(t, err) + + dData, dErr := DeserializeAvro(config, topic, serialized, Value, schema, 0) + assert.Equal(t, "value", dData.(map[string]interface{})["field"]) + assert.Nil(t, dErr) +} diff --git a/configuration.go b/configuration.go index b42df84..b0276c3 100644 --- a/configuration.go +++ b/configuration.go @@ -5,13 +5,16 @@ import ( ) type ConsumerConfiguration struct { - KeyDeserializer string `json:"keyDeserializer"` - ValueDeserializer string `json:"valueDeserializer"` + KeyDeserializer string `json:"keyDeserializer"` + ValueDeserializer string `json:"valueDeserializer"` + SubjectNameStrategy string `json:"subjectNameStrategy"` + UseMagicPrefix bool `json:"useMagicPrefix"` } type ProducerConfiguration struct { - KeySerializer string `json:"keySerializer"` - ValueSerializer string `json:"valueSerializer"` + KeySerializer string `json:"keySerializer"` + ValueSerializer string `json:"valueSerializer"` + SubjectNameStrategy string `json:"subjectNameStrategy"` } type Configuration struct { diff --git a/jsonschema.go b/jsonschema.go index 864a9f5..59e782e 100644 --- a/jsonschema.go +++ b/jsonschema.go @@ -88,7 +88,7 @@ func SerializeJson(configuration Configuration, topic string, data interface{}, // configuration is used to configure the Schema Registry client. The element is // used to define the subject. The data should be a byte array. func DeserializeJson(configuration Configuration, topic string, data []byte, element Element, schema string, version int) (interface{}, *Xk6KafkaError) { - bytesDecodedData, err := DecodeWireFormat(data) + _, bytesDecodedData, err := DecodeWireFormat(data) if err != nil { return nil, NewXk6KafkaError(failedDecodeFromWireFormat, "Failed to remove wire format from the binary data", diff --git a/jsonschema_test.go b/jsonschema_test.go index c93d6ed..6c1ac56 100644 --- a/jsonschema_test.go +++ b/jsonschema_test.go @@ -53,7 +53,7 @@ func TestSerializeDeserializeJsonFailsOnSchemaError(t *testing.T) { assert.Equal(t, failedCreateJsonSchemaCodec, err.Code) // Deserialize the key or value - deserialized, err := DeserializeJson(jsonConfig, "topic", []byte{1, 2, 3, 4, 5, 6}, element, schema, 0) + deserialized, err := DeserializeJson(jsonConfig, "topic", []byte{0, 2, 3, 4, 5, 6}, element, schema, 0) assert.Nil(t, deserialized) assert.Error(t, err.Unwrap()) assert.Equal(t, "Failed to create codec for decoding JSON data", err.Message) @@ -96,7 +96,7 @@ func TestSerializeDeserializeJsonFailsOnMarshalError(t *testing.T) { assert.Equal(t, "Failed to unmarshal JSON data", err.Message) assert.Equal(t, failedUnmarshalJson, err.Code) - deserialized, err := DeserializeJson(jsonConfig, "topic", []byte{1, 2, 3, 4, 5, 6}, element, jsonSchema, 0) + deserialized, err := DeserializeJson(jsonConfig, "topic", []byte{0, 2, 3, 4, 5, 6}, element, jsonSchema, 0) assert.Nil(t, deserialized) assert.Error(t, err.Unwrap()) assert.Equal(t, "Failed to unmarshal JSON data", err.Message) diff --git a/schema_registry.go b/schema_registry.go index 2f333f2..0d5f656 100644 --- a/schema_registry.go +++ b/schema_registry.go @@ -2,10 +2,11 @@ package kafka import ( "encoding/binary" - "net/http" - + "encoding/json" + "fmt" "github.com/riferrei/srclient" "github.com/sirupsen/logrus" + "net/http" ) type Element string @@ -30,12 +31,17 @@ type SchemaRegistryConfiguration struct { // DecodeWireFormat removes the proprietary 5-byte prefix from the Avro, ProtoBuf // or JSONSchema payload. // https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format -func DecodeWireFormat(message []byte) ([]byte, *Xk6KafkaError) { +func DecodeWireFormat(message []byte) (int, []byte, *Xk6KafkaError) { if len(message) < 5 { - return nil, NewXk6KafkaError(messageTooShort, + return 0, nil, NewXk6KafkaError(messageTooShort, "Invalid message: message too short to contain schema id.", nil) } - return message[5:], nil + if message[0] != 0 { + return 0, nil, NewXk6KafkaError(messageTooShort, + "Invalid message: invalid start byte.", nil) + } + magicPrefix := int(binary.BigEndian.Uint32(message[1:5])) + return magicPrefix, message[5:], nil } // EncodeWireFormat adds the proprietary 5-byte prefix to the Avro, ProtoBuf or @@ -103,3 +109,30 @@ func CreateSchema( } return schemaInfo, nil } + +func GetSubjectName(schema string, topic string, element Element, subjectNameStrategy string) (string, *Xk6KafkaError) { + if subjectNameStrategy == "" || subjectNameStrategy == "TopicNameStrategy" { + return topic + "-" + string(element), nil + } + + var schemaMap map[string]interface{} + err := json.Unmarshal([]byte(schema), &schemaMap) + if err != nil { + return "", NewXk6KafkaError(failedEncodeToAvro, "Schema invalid json", nil) + } + var recordName = "" + if namespace, ok := schemaMap["namespace"]; ok { + recordName = namespace.(string) + "." + } + recordName += schemaMap["name"].(string) + + if subjectNameStrategy == "RecordNameStrategy" { + return recordName, nil + } + if subjectNameStrategy == "TopicRecordNameStrategy" { + return topic + "-" + recordName, nil + } + + return "", NewXk6KafkaError(failedEncodeToAvro, fmt.Sprintf( + "Unknown subject name strategy: %v", subjectNameStrategy), nil) +} diff --git a/schema_registry_test.go b/schema_registry_test.go index 8667234..0801992 100644 --- a/schema_registry_test.go +++ b/schema_registry_test.go @@ -9,20 +9,22 @@ import ( // TestDecodeWireFormat tests the decoding of a wire-formatted message. func TestDecodeWireFormat(t *testing.T) { - encoded := []byte{1, 2, 3, 4, 5, 6} - decoded := []byte{6} + encoded := []byte{0, 1, 2, 3, 4, 5} + decoded := []byte{5} + prefix := 16909060 - result, err := DecodeWireFormat(encoded) + magic, result, err := DecodeWireFormat(encoded) assert.Nil(t, err) assert.Equal(t, decoded, result) + assert.Equal(t, magic, prefix) } // TestDecodeWireFormatFails tests the decoding of a wire-formatted message and // fails because the message is too short. func TestDecodeWireFormatFails(t *testing.T) { - encoded := []byte{1, 2, 3, 4} // too short + encoded := []byte{0, 1, 2, 3} // too short - result, err := DecodeWireFormat(encoded) + _, result, err := DecodeWireFormat(encoded) assert.Nil(t, result) assert.NotNil(t, err) assert.Equal(t, "Invalid message: message too short to contain schema id.", err.Message) @@ -123,3 +125,74 @@ func TestCreateSchemaFails(t *testing.T) { assert.NotNil(t, err) assert.Equal(t, "Failed to create schema.", err.Message) } + +func TestGetSubjectNameFailsIfInvalidSchema(t *testing.T) { + _, err := GetSubjectName(`Bad Schema`, "test-topic", Value, "RecordNameStrategy") + assert.NotNil(t, err) + assert.Contains(t, err.Message, "Schema invalid") +} + +func TestGetSubjectNameFailsIfSubjectNameStrategyUnknown(t *testing.T) { + avroSchema := `{"type":"record","name":"Schema","fields":[{"name":"field","type":"string"}]}` + _, err := GetSubjectName(avroSchema, "test-topic", Value, "Unknown") + assert.NotNil(t, err) + assert.Contains(t, err.Message, "Unknown subject name strategy") +} + +func TestGetSubjectNameCanUseDefaultSubjectNameStrategy(t *testing.T) { + avroSchema := `{"type":"record","name":"Schema","fields":[{"name":"field","type":"string"}]}` + for _, element := range []Element{Key, Value} { + subject, err := GetSubjectName(avroSchema, "test-topic", element, "") + assert.Nil(t, err) + assert.Equal(t, "test-topic-"+string(element), subject) + } +} + +func TestGetSubjectNameCanUseTopicNameStrategy(t *testing.T) { + avroSchema := `{"type":"record","name":"Schema","fields":[{"name":"field","type":"string"}]}` + + for _, element := range []Element{Key, Value} { + subject, err := GetSubjectName(avroSchema, "test-topic", element, "TopicNameStrategy") + assert.Nil(t, err) + assert.Equal(t, "test-topic-"+string(element), subject) + } +} + +func TestGetSubjectNameCanUseTopicRecordNameStrategyWithNamespace(t *testing.T) { + avroSchema := `{"type":"record","namespace":"com.example.person","name":"Schema","fields":[{"name":"field","type":"string"}]}` + subject, err := GetSubjectName(avroSchema, "test-topic", Value, "TopicRecordNameStrategy") + assert.Nil(t, err) + assert.Equal(t, "test-topic-com.example.person.Schema", subject) +} + +func TestGetSubjectNameCanUseTopicRecordNameStrategyWithoutNamespace(t *testing.T) { + avroSchema := `{"type":"record","name":"Schema","fields":[{"name":"field","type":"string"}]}` + subject, err := GetSubjectName(avroSchema, "test-topic", Value, "TopicRecordNameStrategy") + assert.Nil(t, err) + assert.Equal(t, "test-topic-Schema", subject) +} + +func TestGetSubjectNameCanUseRecordNameStrategyWithoutNamespace(t *testing.T) { + avroSchema := `{"type":"record","name":"Schema","fields":[{"name":"field","type":"string"}]}` + subject, err := GetSubjectName(avroSchema, "test-topic", Value, "RecordNameStrategy") + assert.Nil(t, err) + assert.Equal(t, "Schema", subject) +} + +func TestGetSubjectNameCanUseRecordNameStrategyWithNamespace(t *testing.T) { + avroSchema := `{"type":"record","namespace":"com.example.person","name":"Schema","fields":[{"name":"field","type":"string"}]}` + subject, err := GetSubjectName(avroSchema, "test-topic", Value, "RecordNameStrategy") + assert.Nil(t, err) + assert.Equal(t, "com.example.person.Schema", subject) +} + +func TestDecodeWireRequiresMagicPrefixByte(t *testing.T) { + encoded := []byte{1, 1, 2, 3, 4} // too short + + _, result, err := DecodeWireFormat(encoded) + assert.Nil(t, result) + assert.NotNil(t, err) + assert.Equal(t, "Invalid message: invalid start byte.", err.Message) + assert.Equal(t, messageTooShort, err.Code) + assert.Nil(t, err.Unwrap()) +} diff --git a/scripts/helpers/schema_registry.js b/scripts/helpers/schema_registry.js new file mode 100644 index 0000000..4747d4e --- /dev/null +++ b/scripts/helpers/schema_registry.js @@ -0,0 +1,6 @@ +import http from 'k6/http'; + +export function getSubject(subject){ + return http.get(`http://localhost:8081/subjects/${subject}/versions/1`) +} + diff --git a/scripts/test_avro_named_strategy_and_magic_prefix.js b/scripts/test_avro_named_strategy_and_magic_prefix.js new file mode 100644 index 0000000..e697f50 --- /dev/null +++ b/scripts/test_avro_named_strategy_and_magic_prefix.js @@ -0,0 +1,101 @@ +/* +This is a k6 test script that imports the xk6-kafka and +tests Kafka with a 100 Avro messages per iteration. +*/ + +import {check} from "k6"; +import { + writer, + reader, + consumeWithConfiguration, + produceWithConfiguration, + createTopic, + deleteTopic, +} from "k6/x/kafka"; +import {getSubject} from "./helpers/schema_registry.js"; + +const bootstrapServers = ["localhost:9092"]; +const kafkaTopic = "test_schema_registry_consume_magic_prefix"; + +const [producer, _writerError] = writer(bootstrapServers, kafkaTopic, null); +const [consumer, _readerError] = reader(bootstrapServers, kafkaTopic, null, "", null, null); + +let configuration = JSON.stringify( + { + consumer: { + keyDeserializer: "", + valueDeserializer: "io.confluent.kafka.serializers.KafkaAvroDeserializer", + userMagicPrefix: true, + }, + producer: { + keySerializer: "", + valueSerializer: "io.confluent.kafka.serializers.KafkaAvroSerializer", + subjectNameStrategy: "RecordNameStrategy" + }, + schemaRegistry: { + url: "http://localhost:8081", + }, + }); + +if (__VU == 0) { + createTopic(bootstrapServers[0], kafkaTopic); +} + +export default function () { + let message = { + value: JSON.stringify({ + firstname: "firstname", + lastname: "lastname", + }), + } + const valueSchema = JSON.stringify({ + "name": "MagicNameValueSchema", + "type": "record", + "namespace": "com.example", + "fields": [ + { + "name": "firstname", + "type": "string" + }, + { + "name": "lastname", + "type": "string" + } + ] + }); + let error = produceWithConfiguration(producer, [message], configuration, null, valueSchema); + + check(error, { + "is sent": (err) => err == undefined, + }); + + check(getSubject("com.example.MagicNameValueSchema"), + { 'status is 200': (r) => r.status === 200 }); + + let [messages, _consumeError] = consumeWithConfiguration( + consumer, + 1, + configuration, + null, + valueSchema + ); + check(messages, { + "1 message returned": (msgs) => msgs.length === 1, + }); +} + +export function teardown(data) { + if (__VU == 0) { + // Delete the kafkaTopic + const error = deleteTopic(bootstrapServers[0], kafkaTopic); + if (error === undefined) { + // If no error returns, it means that the kafkaTopic + // is successfully deleted + console.log("Topic deleted successfully"); + } else { + console.log("Error while deleting kafkaTopic: ", error); + } + } + producer.close(); + consumer.close(); +} From c05b86a113f4a6c86e37f056cb8dcede0b99a35e Mon Sep 17 00:00:00 2001 From: Kirmanie L Ravariere Date: Thu, 16 Jun 2022 09:07:14 -0700 Subject: [PATCH 2/2] Export subject name strategies to constants and clean schema unmarshal error. --- avro.go | 8 ++++---- avro_test.go | 20 ++++++++++---------- error_codes.go | 1 + schema_registry.go | 14 ++++++++++---- schema_registry_test.go | 14 +++++++------- 5 files changed, 32 insertions(+), 25 deletions(-) diff --git a/avro.go b/avro.go index 5006974..587cb56 100644 --- a/avro.go +++ b/avro.go @@ -100,6 +100,7 @@ func DeserializeAvro(configuration Configuration, topic string, data []byte, ele var schemaInfo *srclient.Schema var xk6KafkaError *Xk6KafkaError + var getSchemaError error client := SchemaRegistryClientWithConfiguration(configuration.SchemaRegistry) @@ -113,12 +114,11 @@ func DeserializeAvro(configuration Configuration, topic string, data []byte, ele schemaInfo, xk6KafkaError = CreateSchema(client, subject, schema, srclient.Avro) } else if configuration.Consumer.UseMagicPrefix { // Schema not provided and no valid version flag, so we use te schemaID in the magic prefix - schemaResult, err := client.GetSchema(schemaID) - schemaInfo = schemaResult - if err != nil { + schemaInfo, getSchemaError = client.GetSchema(schemaID) + if getSchemaError != nil { xk6KafkaError = NewXk6KafkaError(failedCreateAvroCodec, "Failed to get schema by magic prefix", - err) + getSchemaError) } } else { // Schema is not provided, so we need to fetch the schema from the Schema Registry diff --git a/avro_test.go b/avro_test.go index 6ffefa4..3105190 100644 --- a/avro_test.go +++ b/avro_test.go @@ -107,7 +107,7 @@ func TestAvroSerializeTopicNameStrategy(t *testing.T) { config := Configuration{ Producer: ProducerConfiguration{ ValueSerializer: AvroSerializer, - SubjectNameStrategy: "TopicNameStrategy", + SubjectNameStrategy: TopicNameStrategy, }, SchemaRegistry: SchemaRegistryConfiguration{ Url: "http://localhost:8081", @@ -158,7 +158,7 @@ func TestAvroSerializeTopicRecordNameStrategy(t *testing.T) { config := Configuration{ Producer: ProducerConfiguration{ ValueSerializer: AvroSerializer, - SubjectNameStrategy: "TopicRecordNameStrategy", + SubjectNameStrategy: TopicRecordNameStrategy, }, SchemaRegistry: SchemaRegistryConfiguration{ Url: "http://localhost:8081", @@ -183,7 +183,7 @@ func TestAvroSerializeRecordNameStrategy(t *testing.T) { config := Configuration{ Producer: ProducerConfiguration{ ValueSerializer: AvroSerializer, - SubjectNameStrategy: "RecordNameStrategy", + SubjectNameStrategy: RecordNameStrategy, }, SchemaRegistry: SchemaRegistryConfiguration{ Url: "http://localhost:8081", @@ -211,7 +211,7 @@ func TestAvroDeserializeUsingMagicPrefix(t *testing.T) { }, Producer: ProducerConfiguration{ ValueSerializer: AvroSerializer, - SubjectNameStrategy: "RecordNameStrategy", + SubjectNameStrategy: RecordNameStrategy, }, SchemaRegistry: SchemaRegistryConfiguration{ Url: "http://localhost:8081", @@ -257,11 +257,11 @@ func TestAvroDeserializeUsingSubjectNameStrategyRecordName(t *testing.T) { config := Configuration{ Producer: ProducerConfiguration{ ValueSerializer: AvroSerializer, - SubjectNameStrategy: "RecordNameStrategy", + SubjectNameStrategy: RecordNameStrategy, }, Consumer: ConsumerConfiguration{ ValueDeserializer: AvroSerializer, - SubjectNameStrategy: "RecordNameStrategy", + SubjectNameStrategy: RecordNameStrategy, }, SchemaRegistry: SchemaRegistryConfiguration{ Url: "http://localhost:8081", @@ -283,11 +283,11 @@ func TestAvroDeserializeUsingSubjectNameStrategyTopicRecordName(t *testing.T) { config := Configuration{ Producer: ProducerConfiguration{ ValueSerializer: AvroSerializer, - SubjectNameStrategy: "TopicRecordNameStrategy", + SubjectNameStrategy: TopicRecordNameStrategy, }, Consumer: ConsumerConfiguration{ ValueDeserializer: AvroSerializer, - SubjectNameStrategy: "TopicRecordNameStrategy", + SubjectNameStrategy: TopicRecordNameStrategy, }, SchemaRegistry: SchemaRegistryConfiguration{ Url: "http://localhost:8081", @@ -309,11 +309,11 @@ func TestAvroDeserializeUsingSubjectNameStrategyTopicName(t *testing.T) { config := Configuration{ Producer: ProducerConfiguration{ ValueSerializer: AvroSerializer, - SubjectNameStrategy: "TopicNameStrategy", + SubjectNameStrategy: TopicNameStrategy, }, Consumer: ConsumerConfiguration{ ValueDeserializer: AvroSerializer, - SubjectNameStrategy: "TopicNameStrategy", + SubjectNameStrategy: TopicNameStrategy, }, SchemaRegistry: SchemaRegistryConfiguration{ Url: "http://localhost:8081", diff --git a/error_codes.go b/error_codes.go index 8f0e194..c0dd1e7 100644 --- a/error_codes.go +++ b/error_codes.go @@ -28,6 +28,7 @@ const ( failedEncodeToJson errCode = 2009 failedEncodeJsonToBinary errCode = 2010 failedDecodeJsonFromBinary errCode = 2011 + failedToUnmarshalSchema errCode = 2012 // producer failedWriteMessage errCode = 3000 diff --git a/schema_registry.go b/schema_registry.go index 0d5f656..1baa252 100644 --- a/schema_registry.go +++ b/schema_registry.go @@ -28,6 +28,12 @@ type SchemaRegistryConfiguration struct { TLSConfig *TLSConfig `json:"tlsConfig"` } +const ( + TopicNameStrategy string = "TopicNameStrategy" + RecordNameStrategy string = "RecordNameStrategy" + TopicRecordNameStrategy string = "TopicRecordNameStrategy" +) + // DecodeWireFormat removes the proprietary 5-byte prefix from the Avro, ProtoBuf // or JSONSchema payload. // https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format @@ -111,14 +117,14 @@ func CreateSchema( } func GetSubjectName(schema string, topic string, element Element, subjectNameStrategy string) (string, *Xk6KafkaError) { - if subjectNameStrategy == "" || subjectNameStrategy == "TopicNameStrategy" { + if subjectNameStrategy == "" || subjectNameStrategy == TopicNameStrategy { return topic + "-" + string(element), nil } var schemaMap map[string]interface{} err := json.Unmarshal([]byte(schema), &schemaMap) if err != nil { - return "", NewXk6KafkaError(failedEncodeToAvro, "Schema invalid json", nil) + return "", NewXk6KafkaError(failedToUnmarshalSchema, "Failed to unmarshal schema", nil) } var recordName = "" if namespace, ok := schemaMap["namespace"]; ok { @@ -126,10 +132,10 @@ func GetSubjectName(schema string, topic string, element Element, subjectNameStr } recordName += schemaMap["name"].(string) - if subjectNameStrategy == "RecordNameStrategy" { + if subjectNameStrategy == RecordNameStrategy { return recordName, nil } - if subjectNameStrategy == "TopicRecordNameStrategy" { + if subjectNameStrategy == TopicRecordNameStrategy { return topic + "-" + recordName, nil } diff --git a/schema_registry_test.go b/schema_registry_test.go index 0801992..71c6cc9 100644 --- a/schema_registry_test.go +++ b/schema_registry_test.go @@ -127,9 +127,9 @@ func TestCreateSchemaFails(t *testing.T) { } func TestGetSubjectNameFailsIfInvalidSchema(t *testing.T) { - _, err := GetSubjectName(`Bad Schema`, "test-topic", Value, "RecordNameStrategy") + _, err := GetSubjectName(`Bad Schema`, "test-topic", Value, RecordNameStrategy) assert.NotNil(t, err) - assert.Contains(t, err.Message, "Schema invalid") + assert.Contains(t, err.Message, "Failed to unmarshal schema") } func TestGetSubjectNameFailsIfSubjectNameStrategyUnknown(t *testing.T) { @@ -152,7 +152,7 @@ func TestGetSubjectNameCanUseTopicNameStrategy(t *testing.T) { avroSchema := `{"type":"record","name":"Schema","fields":[{"name":"field","type":"string"}]}` for _, element := range []Element{Key, Value} { - subject, err := GetSubjectName(avroSchema, "test-topic", element, "TopicNameStrategy") + subject, err := GetSubjectName(avroSchema, "test-topic", element, TopicNameStrategy) assert.Nil(t, err) assert.Equal(t, "test-topic-"+string(element), subject) } @@ -160,28 +160,28 @@ func TestGetSubjectNameCanUseTopicNameStrategy(t *testing.T) { func TestGetSubjectNameCanUseTopicRecordNameStrategyWithNamespace(t *testing.T) { avroSchema := `{"type":"record","namespace":"com.example.person","name":"Schema","fields":[{"name":"field","type":"string"}]}` - subject, err := GetSubjectName(avroSchema, "test-topic", Value, "TopicRecordNameStrategy") + subject, err := GetSubjectName(avroSchema, "test-topic", Value, TopicRecordNameStrategy) assert.Nil(t, err) assert.Equal(t, "test-topic-com.example.person.Schema", subject) } func TestGetSubjectNameCanUseTopicRecordNameStrategyWithoutNamespace(t *testing.T) { avroSchema := `{"type":"record","name":"Schema","fields":[{"name":"field","type":"string"}]}` - subject, err := GetSubjectName(avroSchema, "test-topic", Value, "TopicRecordNameStrategy") + subject, err := GetSubjectName(avroSchema, "test-topic", Value, TopicRecordNameStrategy) assert.Nil(t, err) assert.Equal(t, "test-topic-Schema", subject) } func TestGetSubjectNameCanUseRecordNameStrategyWithoutNamespace(t *testing.T) { avroSchema := `{"type":"record","name":"Schema","fields":[{"name":"field","type":"string"}]}` - subject, err := GetSubjectName(avroSchema, "test-topic", Value, "RecordNameStrategy") + subject, err := GetSubjectName(avroSchema, "test-topic", Value, RecordNameStrategy) assert.Nil(t, err) assert.Equal(t, "Schema", subject) } func TestGetSubjectNameCanUseRecordNameStrategyWithNamespace(t *testing.T) { avroSchema := `{"type":"record","namespace":"com.example.person","name":"Schema","fields":[{"name":"field","type":"string"}]}` - subject, err := GetSubjectName(avroSchema, "test-topic", Value, "RecordNameStrategy") + subject, err := GetSubjectName(avroSchema, "test-topic", Value, RecordNameStrategy) assert.Nil(t, err) assert.Equal(t, "com.example.person.Schema", subject) }