diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java index f0d9767b21e904..81bf03173926dc 100644 --- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java +++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java @@ -50,7 +50,7 @@ /** * Kafka Source that transfers the data from Kafka to Pulsar and sets the Schema type properly. - * We the key and the value deserializer in order to decide the type of Schema to be set on the topic on Pulsar. + * We use the key and the value deserializer in order to decide the type of Schema to be set on the topic on Pulsar. * In case of KafkaAvroDeserializer we use the Schema Registry to download the schema and apply it to the topic. * Please refer to {@link #getSchemaFromDeserializerAndAdaptConfiguration(String, Properties, boolean)} for the list * of supported Deserializers. @@ -135,6 +135,8 @@ public KafkaRecord buildRecord(ConsumerRecord consumerRecord) { private static ByteBuffer extractSimpleValue(Object value) { // we have substituted the original Deserializer with // ByteBufferDeserializer in order to save memory copies + // so here we can have only a ByteBuffer or at most a + // BytesWithKafkaSchema in case of ExtractKafkaAvroSchemaDeserializer if (value == null) { return null; } else if (value instanceof BytesWithKafkaSchema) { @@ -152,11 +154,11 @@ private Schema getSchemaFromObject(Object value, Schema fallback) { // the schema may be different from record to record return schemaCache.get(((BytesWithKafkaSchema) value).getSchemaId()); } else { - return new ByteBufferSchemaWrapper(fallback); + return fallback; } } - private static Schema getSchemaFromDeserializerAndAdaptConfiguration(String key, Properties props, boolean isKey) { + private static Schema getSchemaFromDeserializerAndAdaptConfiguration(String key, Properties props, boolean isKey) { String kafkaDeserializerClass = props.getProperty(key); Objects.requireNonNull(kafkaDeserializerClass); @@ -165,26 +167,27 @@ private static Schema getSchemaFromDeserializerAndAdaptConfiguration(String k // to pass the original ByteBuffer props.put(key, ByteBufferDeserializer.class.getCanonicalName()); + Schema result; if (ByteArrayDeserializer.class.getName().equals(kafkaDeserializerClass) || ByteBufferDeserializer.class.getName().equals(kafkaDeserializerClass) || BytesDeserializer.class.getName().equals(kafkaDeserializerClass)) { - return Schema.BYTEBUFFER; + result = Schema.BYTEBUFFER; } else if (StringDeserializer.class.getName().equals(kafkaDeserializerClass)) { if (isKey) { - // for the key we keep the String + // for the key we use the String value and we want StringDeserializer props.put(key, kafkaDeserializerClass); } - return Schema.STRING; + result = Schema.STRING; } else if (DoubleDeserializer.class.getName().equals(kafkaDeserializerClass)) { - return Schema.DOUBLE; + result = Schema.DOUBLE; } else if (FloatDeserializer.class.getName().equals(kafkaDeserializerClass)) { - return Schema.FLOAT; + result = Schema.FLOAT; } else if (IntegerDeserializer.class.getName().equals(kafkaDeserializerClass)) { - return Schema.INT32; + result = Schema.INT32; } else if (LongDeserializer.class.getName().equals(kafkaDeserializerClass)) { - return Schema.INT64; + result = Schema.INT64; } else if (ShortDeserializer.class.getName().equals(kafkaDeserializerClass)) { - return Schema.INT16; + result = Schema.INT16; } else if (KafkaAvroDeserializer.class.getName().equals(kafkaDeserializerClass)){ // in this case we have to inject our custom deserializer // that extracts Avro schema information @@ -195,6 +198,7 @@ private static Schema getSchemaFromDeserializerAndAdaptConfiguration(String k } else { throw new IllegalArgumentException("Unsupported deserializer "+kafkaDeserializerClass); } + return new ByteBufferSchemaWrapper(result); } Schema getKeySchema() { @@ -229,9 +233,14 @@ public BytesWithKafkaSchema deserialize(String topic, byte[] payload) { } } - private static final class DeferredSchemaPlaceholder extends ByteBufferSchemaWrapper { + static final class DeferredSchemaPlaceholder extends ByteBufferSchemaWrapper { DeferredSchemaPlaceholder() { - super(SchemaInfo.builder().build()); + super(SchemaInfo + .builder() + .type(SchemaType.AVRO) + .properties(Collections.emptyMap()) + .schema(new byte[0]) + .build()); } static final DeferredSchemaPlaceholder INSTANCE = new DeferredSchemaPlaceholder(); } diff --git a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/KafkaBytesSourceTest.java b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/KafkaBytesSourceTest.java index ca57c7ac9f0637..51500de6be6cbb 100644 --- a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/KafkaBytesSourceTest.java +++ b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/KafkaBytesSourceTest.java @@ -84,7 +84,7 @@ public void testNoKeyValueSchema() throws Exception { ShortDeserializer.class.getName(), Schema.INT16); validateSchemaNoKeyValue(StringDeserializer.class.getName(), Schema.STRING, - KafkaAvroDeserializer.class.getName(), Schema.AUTO_PRODUCE_BYTES()); + KafkaAvroDeserializer.class.getName(), KafkaBytesSource.DeferredSchemaPlaceholder.INSTANCE); } @@ -104,22 +104,22 @@ private void validateSchemaNoKeyValue(String keyDeserializationClass, Schema exp assertFalse(source.isProduceKeyValue()); Schema keySchema = source.getKeySchema(); Schema valueSchema = source.getValueSchema(); - assertEquals(keySchema.getClass(), expectedKeySchema.getClass()); - assertEquals(valueSchema.getClass(), expectedValueSchema.getClass()); + assertEquals(keySchema.getSchemaInfo().getType(), expectedKeySchema.getSchemaInfo().getType()); + assertEquals(valueSchema.getSchemaInfo().getType(), expectedValueSchema.getSchemaInfo().getType()); } @Test public void testKeyValueSchema() throws Exception { validateSchemaKeyValue(IntegerDeserializer.class.getName(), Schema.INT32, StringDeserializer.class.getName(), Schema.STRING, - new IntegerSerializer().serialize("test", 10), - new StringSerializer().serialize("test", "test")); + ByteBuffer.wrap(new IntegerSerializer().serialize("test", 10)), + ByteBuffer.wrap(new StringSerializer().serialize("test", "test"))); } private void validateSchemaKeyValue(String keyDeserializationClass, Schema expectedKeySchema, String valueDeserializationClass, Schema expectedValueSchema, - byte[] key, - byte[] value) throws Exception { + ByteBuffer key, + ByteBuffer value) throws Exception { KafkaBytesSource source = new KafkaBytesSource(); Map config = new HashMap<>(); config.put("topic","test"); @@ -134,8 +134,8 @@ private void validateSchemaKeyValue(String keyDeserializationClass, Schema expec assertTrue(source.isProduceKeyValue()); Schema keySchema = source.getKeySchema(); Schema valueSchema = source.getValueSchema(); - assertEquals(keySchema.getClass(), expectedKeySchema.getClass()); - assertEquals(valueSchema.getClass(), expectedValueSchema.getClass()); + assertEquals(keySchema.getSchemaInfo().getType(), expectedKeySchema.getSchemaInfo().getType()); + assertEquals(valueSchema.getSchemaInfo().getType(), expectedValueSchema.getSchemaInfo().getType()); KafkaAbstractSource.KafkaRecord record = source.buildRecord(new ConsumerRecord("test", 0, 0, key, value)); assertThat(record, instanceOf(KafkaAbstractSource.KeyValueKafkaRecord.class)); @@ -144,14 +144,14 @@ private void validateSchemaKeyValue(String keyDeserializationClass, Schema expec assertSame(valueSchema, kvRecord.getValueSchema()); assertEquals(KeyValueEncodingType.SEPARATED, kvRecord.getKeyValueEncodingType()); KeyValue kvValue = (KeyValue) kvRecord.getValue(); - log.info("key {}", Arrays.toString(key)); - log.info("value {}", Arrays.toString(value)); + log.info("key {}", Arrays.toString(toArray(key))); + log.info("value {}", Arrays.toString(toArray(value))); log.info("key {}", Arrays.toString(toArray((ByteBuffer) kvValue.getKey()))); log.info("value {}", Arrays.toString(toArray((ByteBuffer) kvValue.getValue()))); - assertEquals(ByteBuffer.wrap(key).compareTo((ByteBuffer) kvValue.getKey()), 0); - assertEquals(ByteBuffer.wrap(value).compareTo((ByteBuffer) kvValue.getValue()), 0); + assertEquals(ByteBuffer.wrap(toArray(key)).compareTo((ByteBuffer) kvValue.getKey()), 0); + assertEquals(ByteBuffer.wrap(toArray(value)).compareTo((ByteBuffer) kvValue.getValue()), 0); } private static byte[] toArray(ByteBuffer b) {