Skip to content

Commit

Permalink
Clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli committed Mar 22, 2021
1 parent 61804d9 commit 31b3ba9
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 208 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
/**
* This is a ByteBuffer schema that reports SchemaInfo from another Schema instance.
*/
@Value
public class ByteBufferSchemaWrapper implements Schema<ByteBuffer> {
private final Supplier<SchemaInfo> original;

Expand Down Expand Up @@ -61,7 +60,7 @@ public ByteBuffer decode(byte[] bytes, byte[] schemaVersion) {
}

public String toString() {
return "{schema wrapper for "+ original +"}";
return original.get().toString();
}

static byte[] getBytes(ByteBuffer buffer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,20 +155,7 @@ public void start() {
runnerThread.start();
}

public KafkaRecord buildRecord(ConsumerRecord<Object, Object> consumerRecord) {
KafkaRecord record = new KafkaRecord(consumerRecord,
extractValue(consumerRecord),
extractSchema(consumerRecord));
return record;
}

public Object extractValue(ConsumerRecord<Object, Object> consumerRecord) {
return consumerRecord.value();
}

public Schema<V> extractSchema(ConsumerRecord<Object, Object> consumerRecord) {
throw new UnsupportedOperationException();
}
public abstract KafkaRecord buildRecord(ConsumerRecord<Object, Object> consumerRecord);

@Slf4j
protected static class KafkaRecord<V> implements Record<V> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteBufferDeserializer;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.DoubleDeserializer;
import org.apache.kafka.common.serialization.FloatDeserializer;
Expand All @@ -42,17 +43,27 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;

/**
* Simple Kafka Source that just transfers the value part of the kafka records
* as Strings
* Kafka Source that transfers the data from Kafka to Pulsar and sets the Schema type properly.
* We the key and the value deserializer in order to decide the type of Schema to be set on the topic on Pulsar.
* In case of KafkaAvroDeserializer we use the Schema Registry to download the schema and apply it to the topic.
* Please refer to {@link #getSchemaFromDeserializerAndAdaptConfiguration(String, Properties, boolean)} for the list
* of supported Deserializers.
* If you set StringDeserializer for the key then we use the raw key as key for the Pulsar message.
* If you set another Deserializer for the key we use the KeyValue schema type in Pulsar with the SEPARATED encoding.
* This way the Key is stored in the Pulsar key, encoded as base64 string and with a Schema, the Value of the message
* is stored in the Pulsar value with a Schema.
* This way there is a one-to-one mapping between Kafka key/value pair and the Pulsar data model.
*/
@Connector(
name = "kafka",
type = IOType.SOURCE,
help = "The KafkaBytesSource is used for moving messages from Kafka to Pulsar.",
help = "Transfer data from Kafka to Pulsar.",
configClass = KafkaSourceConfig.class
)
@Slf4j
Expand All @@ -72,17 +83,21 @@ protected Properties beforeCreateConsumer(Properties props) {
keySchema = getSchemaFromDeserializerAndAdaptConfiguration(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, props, true);
valueSchema = getSchemaFromDeserializerAndAdaptConfiguration(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, props, false);

boolean needsSchemaCache = keySchema instanceof AutoProduceBytesSchema
|| valueSchema instanceof AutoProduceBytesSchema;
boolean needsSchemaCache = keySchema == DeferredSchemaPlaceholder.INSTANCE
|| valueSchema == DeferredSchemaPlaceholder.INSTANCE;

if (needsSchemaCache) {
initSchemaCache(props);
}

if (keySchema != Schema.STRING) {
if (keySchema.getSchemaInfo().getType() != SchemaType.STRING) {
// if the Key is a String we can use native Pulsar Key
// otherwise we use KeyValue schema
// that allows you to set a schema for the Key and a schema for the Value.
// using SEPARATED encoding the key is saved into the binary key
// so it is used for routing and for compaction
produceKeyValue = true;
}
log.info("keySchema {}, valueSchema {} useKV {}", keySchema, valueSchema, produceKeyValue);

return props;
}
Expand All @@ -92,6 +107,7 @@ private void initSchemaCache(Properties props) {
List<String> urls = config.getSchemaRegistryUrls();
int maxSchemaObject = config.getMaxSchemasPerSubject();
SchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient(urls, maxSchemaObject);
log.info("initializing SchemaRegistry Client, urls:{}, maxSchemasPerSubject: {}", urls, maxSchemaObject);
schemaCache = new AvroSchemaCache(schemaRegistryClient);
}

Expand All @@ -102,13 +118,6 @@ public KafkaRecord buildRecord(ConsumerRecord<Object, Object> consumerRecord) {
Object value = extractSimpleValue(consumerRecord.value());
Schema currentKeySchema = getSchemaFromObject(consumerRecord.key(), keySchema);
Schema currentValueSchema = getSchemaFromObject(consumerRecord.value(), valueSchema);
log.info("buildKVRecord {} {} {} {}", key, value, currentKeySchema, currentValueSchema);
if (currentKeySchema instanceof AutoProduceBytesSchema) {
throw new RuntimeException();
}
if (currentValueSchema instanceof AutoProduceBytesSchema) {
throw new RuntimeException();
}
return new KeyValueKafkaRecord(consumerRecord,
new KeyValue<>(key, value),
currentKeySchema,
Expand All @@ -124,12 +133,12 @@ public KafkaRecord buildRecord(ConsumerRecord<Object, Object> consumerRecord) {
}

private static ByteBuffer extractSimpleValue(Object value) {
// we have substituted the original Deserializer with
// ByteBufferDeserializer in order to save memory copies
if (value == null) {
return null;
} else if (value instanceof BytesWithKafkaSchema) {
return ((BytesWithKafkaSchema) value).getValue();
} else if (value instanceof byte[]) {
return ByteBuffer.wrap((byte[]) value);
} else if (value instanceof ByteBuffer) {
return (ByteBuffer) value;
} else {
Expand All @@ -147,39 +156,23 @@ private Schema<ByteBuffer> getSchemaFromObject(Object value, Schema fallback) {
}
}

public static class ExtractKafkaAvroSchemaDeserializer implements Deserializer<BytesWithKafkaSchema> {

@Override
public BytesWithKafkaSchema deserialize(String topic, byte[] payload) {
if (payload == null) {
return null;
} else {
try {
ByteBuffer buffer = ByteBuffer.wrap(payload);
buffer.get(); // magic number
int id = buffer.getInt();
return new BytesWithKafkaSchema(buffer, id);
} catch (Exception err) {
throw new SerializationException("Error deserializing Avro message", err);
}
}
}
}

private static Schema<?> getSchemaFromDeserializerAndAdaptConfiguration(String key, Properties props, boolean isKey) {
String kafkaDeserializerClass = props.getProperty(key);
Objects.requireNonNull(kafkaDeserializerClass);

// we want to simply transfer the bytes
// we want to simply transfer the bytes,
// by default we override the Kafka Consumer configuration
// to pass the original ByteBuffer
props.put(key, ByteBufferDeserializer.class.getCanonicalName());

if (ByteArrayDeserializer.class.getName().equals(kafkaDeserializerClass)) {
return Schema.BYTEBUFFER;
} else if (ByteBufferDeserializer.class.getName().equals(kafkaDeserializerClass)) {
if (ByteArrayDeserializer.class.getName().equals(kafkaDeserializerClass)
|| ByteBufferDeserializer.class.getName().equals(kafkaDeserializerClass)
|| BytesDeserializer.class.getName().equals(kafkaDeserializerClass)) {
return Schema.BYTEBUFFER;
} else if (StringDeserializer.class.getName().equals(kafkaDeserializerClass)) {
if (isKey) {
props.put(key, StringDeserializer.class.getCanonicalName());
// for the key we keep the String
props.put(key, kafkaDeserializerClass);
}
return Schema.STRING;
} else if (DoubleDeserializer.class.getName().equals(kafkaDeserializerClass)) {
Expand All @@ -196,7 +189,9 @@ private static Schema<?> getSchemaFromDeserializerAndAdaptConfiguration(String k
// in this case we have to inject our custom deserializer
// that extracts Avro schema information
props.put(key, ExtractKafkaAvroSchemaDeserializer.class.getName());
return Schema.AUTO_PRODUCE_BYTES();
// this is only a placeholder, we are not really using AUTO_PRODUCE_BYTES
// but we the schema is created by downloading the definition from the SchemaRegistry
return DeferredSchemaPlaceholder.INSTANCE;
} else {
throw new IllegalArgumentException("Unsupported deserializer "+kafkaDeserializerClass);
}
Expand All @@ -214,4 +209,31 @@ boolean isProduceKeyValue() {
return produceKeyValue;
}


public static class ExtractKafkaAvroSchemaDeserializer implements Deserializer<BytesWithKafkaSchema> {

@Override
public BytesWithKafkaSchema deserialize(String topic, byte[] payload) {
if (payload == null) {
return null;
} else {
try {
ByteBuffer buffer = ByteBuffer.wrap(payload);
buffer.get(); // magic number
int id = buffer.getInt();
return new BytesWithKafkaSchema(buffer, id);
} catch (Exception err) {
throw new SerializationException("Error deserializing Avro message", err);
}
}
}
}

private static final class DeferredSchemaPlaceholder extends ByteBufferSchemaWrapper {
DeferredSchemaPlaceholder() {
super(SchemaInfo.builder().build());
}
static final DeferredSchemaPlaceholder INSTANCE = new DeferredSchemaPlaceholder();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@
*/
public class KafkaStringSource extends KafkaAbstractSource<String> {

@Override
public Object extractValue(ConsumerRecord<Object, Object> consumerRecord) {
return new String((byte[]) consumerRecord.value(), StandardCharsets.UTF_8);
}

@Override
public Schema<String> extractSchema(ConsumerRecord<Object, Object> consumerRecord) {
return Schema.STRING;
public KafkaRecord buildRecord(ConsumerRecord<Object, Object> consumerRecord) {
KafkaRecord record = new KafkaRecord(consumerRecord,
new String((byte[]) consumerRecord.value(), StandardCharsets.UTF_8),
Schema.STRING);
return record;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteBufferDeserializer;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.DoubleDeserializer;
import org.apache.kafka.common.serialization.FloatDeserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
Expand Down Expand Up @@ -61,13 +63,10 @@ public void testNoKeyValueSchema() throws Exception {
StringDeserializer.class.getName(), Schema.STRING);

validateSchemaNoKeyValue(StringDeserializer.class.getName(), Schema.STRING,
ByteArrayDeserializer.class.getName(), Schema.BYTEBUFFER);
ByteBufferDeserializer.class.getName(), Schema.BYTEBUFFER);

validateSchemaNoKeyValue(StringDeserializer.class.getName(), Schema.STRING,
ByteArrayDeserializer.class.getName(), Schema.BYTEBUFFER);

validateSchemaNoKeyValue(StringDeserializer.class.getName(), Schema.STRING,
DoubleDeserializer.class.getName(), Schema.DOUBLE);
BytesDeserializer.class.getName(), Schema.BYTEBUFFER);

validateSchemaNoKeyValue(StringDeserializer.class.getName(), Schema.STRING,
DoubleDeserializer.class.getName(), Schema.DOUBLE);
Expand Down
Loading

0 comments on commit 31b3ba9

Please sign in to comment.