Skip to content

Commit

Permalink
Merge pull request #9 from guozhangwang/master
Browse files Browse the repository at this point in the history
allow deserializer override at KStream construction
  • Loading branch information
guozhangwang committed Jul 12, 2015
2 parents c55daa9 + c84a677 commit 980c701
Show file tree
Hide file tree
Showing 21 changed files with 416 additions and 122 deletions.
66 changes: 66 additions & 0 deletions src/main/java/io/confluent/streaming/KStream.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
package io.confluent.streaming;

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;

/**
* KStream is an abstraction of a stream of key-value pairs.
*/
public interface KStream<K, V> {

/**
* Returns the KStreamContext used to create this stream
*/
KStreamContext context();

/**
* Creates a new stream consists of all elements of this stream which satisfy a predicate
* @param predicate the instance of Predicate
Expand Down Expand Up @@ -72,18 +80,76 @@ public interface KStream<K, V> {

/**
* Sends key-value to a topic, also creates a new stream from the topic.
* The created stream is added to the default synchronization group.
* This is equivalent to calling sendTo(topic) and KStreamContext.from(topic).
* @param topic the topic name
* @return KStream
*/
KStream<K, V> through(String topic);

/**
* Sends key-value to a topic, also creates a new stream from the topic.
* The created stream is added to the specified synchronization group.
* This is equivalent to calling sendTo(topic) and KStreamContext.from(topic, syncGroup).
* @param topic the topic name
* @param syncGroup the synchronization group
* @return KStream
*/
KStream<K, V> through(String topic, SyncGroup syncGroup);

/**
* Sends key-value to a topic, also creates a new stream from the topic.
* The created stream is added to the default synchronization group.
* This is equivalent to calling sendTo(topic) and KStreamContext.from(topic).
* @param topic the topic name
* @param keySerializer key serializer used to send key-value pairs,
* if not specified the default serializer defined in the configs will be used
* @param valSerializer value serializer used to send key-value pairs,
* if not specified the default serializer defined in the configs will be used
* @param keyDeserializer key deserializer used to create the new KStream,
* if not specified the default deserializer defined in the configs will be used
* @param valDeserializer value deserializer used to create the new KStream,
* if not specified the default deserializer defined in the configs will be used
*
* @return KStream
*/
<K1, V1> KStream<K1, V1> through(String topic, Serializer<?> keySerializer, Serializer<?> valSerializer, Deserializer<?> keyDeserializer, Deserializer<?> valDeserializer);

/**
* Sends key-value to a topic, also creates a new stream from the topic.
* The created stream is added to the specific synchronization group.
* This is equivalent to calling sendTo(topic) and KStreamContext.from(topic).
* @param topic the topic name
* @param syncGroup the synchronization group
* @param keySerializer key serializer used to send key-value pairs,
* if not specified the default serializer defined in the configs will be used
* @param valSerializer value serializer used to send key-value pairs,
* if not specified the default serializer defined in the configs will be used
* @param keyDeserializer key deserializer used to create the new KStream,
* if not specified the default deserializer defined in the configs will be used
* @param valDeserializer value deserializer used to create the new KStream,
* if not specified the default deserializer defined in the configs will be used
*
* @return KStream
*/
<K1, V1> KStream<K1, V1> through(String topic, SyncGroup syncGroup, Serializer<?> keySerializer, Serializer<?> valSerializer, Deserializer<?> keyDeserializer, Deserializer<?> valDeserializer);

/**
* Sends key-value to a topic.
* @param topic the topic name
*/
void sendTo(String topic);

/**
* Sends key-value to a topic.
* @param topic the topic name
* @param keySerializer key serializer used to send key-value pairs,
* if not specified the default serializer defined in the configs will be used
* @param valSerializer value serializer used to send key-value pairs,
* if not specified the default serializer defined in the configs will be used
*/
void sendTo(String topic, Serializer<?> keySerializer, Serializer<?> valSerializer);

/**
* Processes all elements in this stream by applying a processor.
* @param processor the instance of Processor
Expand Down
24 changes: 21 additions & 3 deletions src/main/java/io/confluent/streaming/KStreamContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,29 @@ public interface KStreamContext {
*/
KStream<?, ?> from(String topic, SyncGroup syncGroup);


/**
* Returns a RecordCollector which takes binary (byte array) key and value.
* @return RecordCollector
* Creates a KStream instance for the specified topic. The stream is added to the default synchronization group.
* @param topic the topic name
* @param keyDeserializer key deserializer used to read this source KStream,
* if not specified the default deserializer defined in the configs will be used
* @param valDeserializer value deserializer used to read this source KStream,
* if not specified the default deserializer defined in the configs will be used
* @return KStream
*/
KStream<?, ?> from(String topic, Deserializer<?> keyDeserializer, Deserializer<?> valDeserializer);

/**
* Creates a KStream instance for the specified topic. The stream is added to the specified synchronization group.
* @param topic the topic name
* @param syncGroup the synchronization group
* @param keyDeserializer key deserializer used to read this source KStream,
* if not specified the default deserializer defined in the configs will be used
* @param valDeserializer value deserializer used to read this source KStream,
* if not specified the default deserializer defined in the configs will be used
* @return KStream
*/
RecordCollector<byte[], byte[]> simpleRecordCollector();
KStream<?, ?> from(String topic, SyncGroup syncGroup, Deserializer<?> keyDeserializer, Deserializer<?> valDeserializer);

/**
* Returns a RecordCollector which applies the serializer to key and value.
Expand Down
11 changes: 8 additions & 3 deletions src/main/java/io/confluent/streaming/StreamingConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,14 @@ public class StreamingConfig {

public StreamingConfig(Properties config) {
this.config = config;
config.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
config.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "range");
config.setProperty(ProducerConfig.LINGER_MS_CONFIG, "100");
this.config.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
this.config.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "range");
this.config.setProperty(ProducerConfig.LINGER_MS_CONFIG, "100");
}

@Override
public StreamingConfig clone() {
return new StreamingConfig(this.config);
}

public void addContextObject(String key, Object value) {
Expand Down
17 changes: 1 addition & 16 deletions src/main/java/io/confluent/streaming/internal/IngestorImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void poll(long timeoutMs) {
StreamSynchronizer streamSynchronizer = streamSynchronizers.get(partition);

if (streamSynchronizer != null)
streamSynchronizer.addRecords(partition, new DeserializingIterator(records.records(partition).iterator()));
streamSynchronizer.addRecords(partition, records.records(partition).iterator());
else
log.warn("unused topic: " + partition.topic());
}
Expand Down Expand Up @@ -112,19 +112,4 @@ public void clear() {
toBePaused.clear();
streamSynchronizers.clear();
}

private class DeserializingIterator extends FilteredIterator<ConsumerRecord<Object, Object>, ConsumerRecord<byte[], byte[]>> {

DeserializingIterator(Iterator<ConsumerRecord<byte[], byte[]>> inner) {
super(inner);
}

protected ConsumerRecord<Object, Object> filter(ConsumerRecord<byte[], byte[]> record) {
Object key = keyDeserializer.deserialize(record.topic(), record.key());
Object value = valueDeserializer.deserialize(record.topic(), record.value());
return new ConsumerRecord<>(record.topic(), record.partition(), record.offset(), key, value);
}

}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.confluent.streaming.internal;

import io.confluent.streaming.KStreamContext;
import io.confluent.streaming.Predicate;

import java.lang.reflect.Array;
Expand All @@ -14,7 +15,7 @@ class KStreamBranch<K, V> implements Receiver {
final KStreamSource<K, V>[] branches;

@SuppressWarnings("unchecked")
KStreamBranch(Predicate<K, V>[] predicates, PartitioningInfo partitioningInfo, KStreamContextImpl context) {
KStreamBranch(Predicate<K, V>[] predicates, PartitioningInfo partitioningInfo, KStreamContext context) {
this.predicates = Arrays.copyOf(predicates, predicates.length);
this.branches = (KStreamSource<K, V>[]) Array.newInstance(KStreamSource.class, predicates.length);
for (int i = 0; i < branches.length; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ public class KStreamContextImpl implements KStreamContext {
private final ProcessorStateManager stateMgr;
private Consumer<byte[], byte[]> restoreConsumer;

@SuppressWarnings("unchecked")
public KStreamContextImpl(int id,
KStreamJob job,
Set<String> topics,
Expand All @@ -52,14 +51,33 @@ public KStreamContextImpl(int id,
StreamingConfig streamingConfig,
ProcessorConfig processorConfig,
Metrics metrics) {

this(id, job, topics, ingestor,
new RecordCollectors.SimpleRecordCollector(producer),
coordinator, streamingConfig, processorConfig,
new ProcessorStateManager(id, new File(processorConfig.stateDir, Integer.toString(id))),
metrics);
}

@SuppressWarnings("unchecked")
public KStreamContextImpl(int id,
KStreamJob job,
Set<String> topics,
Ingestor ingestor,
RecordCollectors.SimpleRecordCollector simpleCollector,
Coordinator coordinator,
StreamingConfig streamingConfig,
ProcessorConfig processorConfig,
ProcessorStateManager stateMgr,
Metrics metrics) {
this.id = id;
this.job = job;
this.topics = topics;
this.ingestor = ingestor;

this.simpleCollector = new RecordCollectors.SimpleRecordCollector(producer);
this.collector = new RecordCollectors.SerializingRecordCollector<Object, Object>(
simpleCollector, (Serializer<Object>) streamingConfig.keySerializer(), (Serializer<Object>) streamingConfig.valueSerializer());
this.simpleCollector = simpleCollector;
this.collector = new RecordCollectors.SerializingRecordCollector(
simpleCollector, streamingConfig.keySerializer(), streamingConfig.valueSerializer());

this.coordinator = coordinator;
this.streamingConfig = streamingConfig;
Expand All @@ -68,11 +86,13 @@ public KStreamContextImpl(int id,
this.timestampExtractor = this.streamingConfig.timestampExtractor();
if (this.timestampExtractor == null) throw new NullPointerException("timestamp extractor is missing");

this.stateDir = new File(processorConfig.stateDir, Integer.toString(id));
this.stateMgr = new ProcessorStateManager(id, stateDir);
this.stateMgr = stateMgr;
this.stateDir = this.stateMgr.baseDir();
this.metrics = metrics;
}

public RecordCollectors.SimpleRecordCollector simpleRecordCollector() { return this.simpleCollector; }

@Override
public int id() {
return id;
Expand Down Expand Up @@ -100,12 +120,22 @@ public Deserializer<?> valueDeserializer() {

@Override
public KStream<?, ?> from(String topic) {
return from(topic, syncGroup(DEFAULT_SYNCHRONIZATION_GROUP));
return from(topic, syncGroup(DEFAULT_SYNCHRONIZATION_GROUP), null, null);
}

@Override
@SuppressWarnings("unchecked")
public KStream<?, ?> from(String topic, SyncGroup syncGroup) {
return from(topic, syncGroup, null, null);
}

@Override
public KStream<?, ?> from(String topic, Deserializer<?> keyDeserializer, Deserializer<?> valDeserializer) {
return from(topic, syncGroup(DEFAULT_SYNCHRONIZATION_GROUP), keyDeserializer, valDeserializer);
}

@Override
@SuppressWarnings("unchecked")
public KStream<?, ?> from(String topic, SyncGroup syncGroup, Deserializer<?> keyDeserializer, Deserializer<?> valDeserializer) {
if (syncGroup == null) throw new NullPointerException();

synchronized (this) {
Expand All @@ -122,7 +152,24 @@ public Deserializer<?> valueDeserializer() {
partitioningInfos.put(topic, partitioningInfo);
}

stream = new KStreamSource<Object, Object>(partitioningInfo, this);
// override the deserializer classes if specified
if (keyDeserializer == null && valDeserializer == null) {
stream = new KStreamSource<Object, Object>(partitioningInfo, this);
} else {
StreamingConfig newConfig = this.streamingConfig.clone();
if (keyDeserializer != null)
newConfig.keyDeserializer(keyDeserializer);
if (valDeserializer != null)
newConfig.valueDeserializer(valDeserializer);

KStreamContextImpl newContext = new KStreamContextImpl(
this.id, this.job, this.topics, this.ingestor,
this.simpleCollector, this.coordinator,
newConfig, this.processorConfig,
this.stateMgr, this.metrics);
stream = new KStreamSource<Object, Object>(partitioningInfo, newContext);
}

sourceStreams.put(topic, stream);

TopicPartition partition = new TopicPartition(topic, id);
Expand All @@ -133,17 +180,19 @@ public Deserializer<?> valueDeserializer() {
else {
if (stream.partitioningInfo.syncGroup == syncGroup)
throw new IllegalStateException("topic is already assigned a different synchronization group");

// TODO: with this constraint we will not allow users to create KStream with different
// deser from the same topic, this constraint may better be relaxed later.
if (keyDeserializer != null && !keyDeserializer.getClass().equals(this.keyDeserializer().getClass()))
throw new IllegalStateException("another source stream with the same topic but different key deserializer is already created");
if (valDeserializer != null && !valDeserializer.getClass().equals(this.valueDeserializer().getClass()))
throw new IllegalStateException("another source stream with the same topic but different value deserializer is already created");
}

return stream;
}
}

@Override
public RecordCollector<byte[], byte[]> simpleRecordCollector() {
return simpleCollector;
}

@Override
public RecordCollector<Object, Object> recordCollector() {
return collector;
Expand Down Expand Up @@ -189,7 +238,7 @@ private SyncGroup syncGroup(String name, Chooser chooser) {
new StreamSynchronizer(name, ingestor, chooser, timestampExtractor, desiredUnprocessedPerPartition);
streamSynchronizerMap.put(name, streamSynchronizer);
}
return (SyncGroup)streamSynchronizer;
return streamSynchronizer;
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package io.confluent.streaming.internal;

import io.confluent.streaming.KStreamContext;
import io.confluent.streaming.Predicate;

/**
* Created by yasuhiro on 6/17/15.
*/
class KStreamFilter<K, V> extends KStreamImpl<K, V, K, V> {
class KStreamFilter<K, V> extends KStreamImpl<K, V> {

private final Predicate<K, V> predicate;

KStreamFilter(Predicate<K, V> predicate, PartitioningInfo partitioningInfo, KStreamContextImpl context) {
KStreamFilter(Predicate<K, V> predicate, PartitioningInfo partitioningInfo, KStreamContext context) {
super(partitioningInfo, context);
this.predicate = predicate;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
package io.confluent.streaming.internal;

import io.confluent.streaming.KStreamContext;
import io.confluent.streaming.KeyValueMapper;
import io.confluent.streaming.KeyValue;
import io.confluent.streaming.SyncGroup;

/**
* Created by yasuhiro on 6/17/15.
*/
class KStreamFlatMap<K, V, K1, V1> extends KStreamImpl<K, V, K1, V1> {
class KStreamFlatMap<K, V, K1, V1> extends KStreamImpl<K, V> {

private final KeyValueMapper<K, ? extends Iterable<V>, K1, V1> mapper;

KStreamFlatMap(KeyValueMapper<K, ? extends Iterable<V>, K1, V1> mapper, SyncGroup syncGroup, KStreamContextImpl context) {
KStreamFlatMap(KeyValueMapper<K, ? extends Iterable<V>, K1, V1> mapper, SyncGroup syncGroup, KStreamContext context) {
super(PartitioningInfo.unjoinable(syncGroup), context);
this.mapper = mapper;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package io.confluent.streaming.internal;

import io.confluent.streaming.KStreamContext;
import io.confluent.streaming.ValueMapper;

/**
* Created by yasuhiro on 6/17/15.
*/
class KStreamFlatMapValues<K, V, V1> extends KStreamImpl<K, V, K, V1> {
class KStreamFlatMapValues<K, V, V1> extends KStreamImpl<K, V> {

private final ValueMapper<? extends Iterable<V>, V1> mapper;

KStreamFlatMapValues(ValueMapper<? extends Iterable<V>, V1> mapper, PartitioningInfo partitioningInfo, KStreamContextImpl context) {
KStreamFlatMapValues(ValueMapper<? extends Iterable<V>, V1> mapper, PartitioningInfo partitioningInfo, KStreamContext context) {
super(partitioningInfo, context);
this.mapper = mapper;
}
Expand Down
Loading

0 comments on commit 980c701

Please sign in to comment.