Skip to content

Commit

Permalink
more clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli committed Mar 22, 2021
1 parent 31b3ba9 commit 63b088a
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@

/**
* Kafka Source that transfers the data from Kafka to Pulsar and sets the Schema type properly.
* We the key and the value deserializer in order to decide the type of Schema to be set on the topic on Pulsar.
* We use the key and the value deserializer in order to decide the type of Schema to be set on the topic on Pulsar.
* In case of KafkaAvroDeserializer we use the Schema Registry to download the schema and apply it to the topic.
* Please refer to {@link #getSchemaFromDeserializerAndAdaptConfiguration(String, Properties, boolean)} for the list
* of supported Deserializers.
Expand Down Expand Up @@ -135,6 +135,8 @@ public KafkaRecord buildRecord(ConsumerRecord<Object, Object> consumerRecord) {
private static ByteBuffer extractSimpleValue(Object value) {
// we have substituted the original Deserializer with
// ByteBufferDeserializer in order to save memory copies
// so here we can have only a ByteBuffer or at most a
// BytesWithKafkaSchema in case of ExtractKafkaAvroSchemaDeserializer
if (value == null) {
return null;
} else if (value instanceof BytesWithKafkaSchema) {
Expand All @@ -152,11 +154,11 @@ private Schema<ByteBuffer> getSchemaFromObject(Object value, Schema fallback) {
// the schema may be different from record to record
return schemaCache.get(((BytesWithKafkaSchema) value).getSchemaId());
} else {
return new ByteBufferSchemaWrapper(fallback);
return fallback;
}
}

private static Schema<?> getSchemaFromDeserializerAndAdaptConfiguration(String key, Properties props, boolean isKey) {
private static Schema<ByteBuffer> getSchemaFromDeserializerAndAdaptConfiguration(String key, Properties props, boolean isKey) {
String kafkaDeserializerClass = props.getProperty(key);
Objects.requireNonNull(kafkaDeserializerClass);

Expand All @@ -165,26 +167,27 @@ private static Schema<?> getSchemaFromDeserializerAndAdaptConfiguration(String k
// to pass the original ByteBuffer
props.put(key, ByteBufferDeserializer.class.getCanonicalName());

Schema<?> result;
if (ByteArrayDeserializer.class.getName().equals(kafkaDeserializerClass)
|| ByteBufferDeserializer.class.getName().equals(kafkaDeserializerClass)
|| BytesDeserializer.class.getName().equals(kafkaDeserializerClass)) {
return Schema.BYTEBUFFER;
result = Schema.BYTEBUFFER;
} else if (StringDeserializer.class.getName().equals(kafkaDeserializerClass)) {
if (isKey) {
// for the key we keep the String
// for the key we use the String value and we want StringDeserializer
props.put(key, kafkaDeserializerClass);
}
return Schema.STRING;
result = Schema.STRING;
} else if (DoubleDeserializer.class.getName().equals(kafkaDeserializerClass)) {
return Schema.DOUBLE;
result = Schema.DOUBLE;
} else if (FloatDeserializer.class.getName().equals(kafkaDeserializerClass)) {
return Schema.FLOAT;
result = Schema.FLOAT;
} else if (IntegerDeserializer.class.getName().equals(kafkaDeserializerClass)) {
return Schema.INT32;
result = Schema.INT32;
} else if (LongDeserializer.class.getName().equals(kafkaDeserializerClass)) {
return Schema.INT64;
result = Schema.INT64;
} else if (ShortDeserializer.class.getName().equals(kafkaDeserializerClass)) {
return Schema.INT16;
result = Schema.INT16;
} else if (KafkaAvroDeserializer.class.getName().equals(kafkaDeserializerClass)){
// in this case we have to inject our custom deserializer
// that extracts Avro schema information
Expand All @@ -195,6 +198,7 @@ private static Schema<?> getSchemaFromDeserializerAndAdaptConfiguration(String k
} else {
throw new IllegalArgumentException("Unsupported deserializer "+kafkaDeserializerClass);
}
return new ByteBufferSchemaWrapper(result);
}

Schema getKeySchema() {
Expand Down Expand Up @@ -229,9 +233,14 @@ public BytesWithKafkaSchema deserialize(String topic, byte[] payload) {
}
}

private static final class DeferredSchemaPlaceholder extends ByteBufferSchemaWrapper {
static final class DeferredSchemaPlaceholder extends ByteBufferSchemaWrapper {
DeferredSchemaPlaceholder() {
super(SchemaInfo.builder().build());
super(SchemaInfo
.builder()
.type(SchemaType.AVRO)
.properties(Collections.emptyMap())
.schema(new byte[0])
.build());
}
static final DeferredSchemaPlaceholder INSTANCE = new DeferredSchemaPlaceholder();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void testNoKeyValueSchema() throws Exception {
ShortDeserializer.class.getName(), Schema.INT16);

validateSchemaNoKeyValue(StringDeserializer.class.getName(), Schema.STRING,
KafkaAvroDeserializer.class.getName(), Schema.AUTO_PRODUCE_BYTES());
KafkaAvroDeserializer.class.getName(), KafkaBytesSource.DeferredSchemaPlaceholder.INSTANCE);

}

Expand All @@ -104,22 +104,22 @@ private void validateSchemaNoKeyValue(String keyDeserializationClass, Schema exp
assertFalse(source.isProduceKeyValue());
Schema keySchema = source.getKeySchema();
Schema valueSchema = source.getValueSchema();
assertEquals(keySchema.getClass(), expectedKeySchema.getClass());
assertEquals(valueSchema.getClass(), expectedValueSchema.getClass());
assertEquals(keySchema.getSchemaInfo().getType(), expectedKeySchema.getSchemaInfo().getType());
assertEquals(valueSchema.getSchemaInfo().getType(), expectedValueSchema.getSchemaInfo().getType());
}

@Test
public void testKeyValueSchema() throws Exception {
validateSchemaKeyValue(IntegerDeserializer.class.getName(), Schema.INT32,
StringDeserializer.class.getName(), Schema.STRING,
new IntegerSerializer().serialize("test", 10),
new StringSerializer().serialize("test", "test"));
ByteBuffer.wrap(new IntegerSerializer().serialize("test", 10)),
ByteBuffer.wrap(new StringSerializer().serialize("test", "test")));
}

private void validateSchemaKeyValue(String keyDeserializationClass, Schema expectedKeySchema,
String valueDeserializationClass, Schema expectedValueSchema,
byte[] key,
byte[] value) throws Exception {
ByteBuffer key,
ByteBuffer value) throws Exception {
KafkaBytesSource source = new KafkaBytesSource();
Map<String, Object> config = new HashMap<>();
config.put("topic","test");
Expand All @@ -134,8 +134,8 @@ private void validateSchemaKeyValue(String keyDeserializationClass, Schema expec
assertTrue(source.isProduceKeyValue());
Schema keySchema = source.getKeySchema();
Schema valueSchema = source.getValueSchema();
assertEquals(keySchema.getClass(), expectedKeySchema.getClass());
assertEquals(valueSchema.getClass(), expectedValueSchema.getClass());
assertEquals(keySchema.getSchemaInfo().getType(), expectedKeySchema.getSchemaInfo().getType());
assertEquals(valueSchema.getSchemaInfo().getType(), expectedValueSchema.getSchemaInfo().getType());

KafkaAbstractSource.KafkaRecord record = source.buildRecord(new ConsumerRecord<Object, Object>("test", 0, 0, key, value));
assertThat(record, instanceOf(KafkaAbstractSource.KeyValueKafkaRecord.class));
Expand All @@ -144,14 +144,14 @@ private void validateSchemaKeyValue(String keyDeserializationClass, Schema expec
assertSame(valueSchema, kvRecord.getValueSchema());
assertEquals(KeyValueEncodingType.SEPARATED, kvRecord.getKeyValueEncodingType());
KeyValue kvValue = (KeyValue) kvRecord.getValue();
log.info("key {}", Arrays.toString(key));
log.info("value {}", Arrays.toString(value));
log.info("key {}", Arrays.toString(toArray(key)));
log.info("value {}", Arrays.toString(toArray(value)));

log.info("key {}", Arrays.toString(toArray((ByteBuffer) kvValue.getKey())));
log.info("value {}", Arrays.toString(toArray((ByteBuffer) kvValue.getValue())));

assertEquals(ByteBuffer.wrap(key).compareTo((ByteBuffer) kvValue.getKey()), 0);
assertEquals(ByteBuffer.wrap(value).compareTo((ByteBuffer) kvValue.getValue()), 0);
assertEquals(ByteBuffer.wrap(toArray(key)).compareTo((ByteBuffer) kvValue.getKey()), 0);
assertEquals(ByteBuffer.wrap(toArray(value)).compareTo((ByteBuffer) kvValue.getValue()), 0);
}

private static byte[] toArray(ByteBuffer b) {
Expand Down

0 comments on commit 63b088a

Please sign in to comment.