Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor TopologyBuilder #8

Merged
merged 2 commits into from
Aug 26, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,155 +27,172 @@

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class TopologyBuilder {

private Map<String, ProcessorClazz> processorClasses = new HashMap<>();
private Map<String, SourceClazz> sourceClasses = new HashMap<>();
private Map<String, SinkClazz> sinkClasses = new HashMap<>();
private Map<String, String> topicsToSourceNames = new HashMap<>();
private Map<String, String> topicsToSinkNames = new HashMap<>();
// list of node factories in a topological order
private ArrayList<NodeFactory> nodeFactories = new ArrayList<>();

private Map<String, List<String>> parents = new HashMap<>();
private Map<String, List<String>> children = new HashMap<>();
private Set<String> nodeNames = new HashSet<>();
private Set<String> sourceTopicNames = new HashSet<>();

private class ProcessorClazz {
public ProcessorFactory factory;
private interface NodeFactory {
ProcessorNode build();
}

private class ProcessorNodeFactory implements NodeFactory {
public final String[] parents;
private final String name;
private final ProcessorFactory factory;

public ProcessorClazz(ProcessorFactory factory) {
public ProcessorNodeFactory(String name, String[] parents, ProcessorFactory factory) {
this.name = name;
this.parents = parents.clone();
this.factory = factory;
}

public ProcessorNode build() {
Processor processor = factory.build();
return new ProcessorNode(name, processor);
}
}

private class SourceClazz {
public Deserializer keyDeserializer;
public Deserializer valDeserializer;
private class SourceNodeFactory implements NodeFactory {
public final String[] topics;
private final String name;
private Deserializer keyDeserializer;
private Deserializer valDeserializer;

private SourceClazz(Deserializer keyDeserializer, Deserializer valDeserializer) {
private SourceNodeFactory(String name, String[] topics, Deserializer keyDeserializer, Deserializer valDeserializer) {
this.name = name;
this.topics = topics.clone();
this.keyDeserializer = keyDeserializer;
this.valDeserializer = valDeserializer;
}
}

private class SinkClazz {
public Serializer keySerializer;
public Serializer valSerializer;
public ProcessorNode build() {
return new SourceNode(name, keyDeserializer, valDeserializer);
}
}

private SinkClazz(Serializer keySerializer, Serializer valSerializer) {
private class SinkNodeFactory implements NodeFactory {
public final String[] parents;
public final String topic;
private final String name;
private Serializer keySerializer;
private Serializer valSerializer;

private SinkNodeFactory(String name, String[] parents, String topic, Serializer keySerializer, Serializer valSerializer) {
this.name = name;
this.parents = parents.clone();
this.topic = topic;
this.keySerializer = keySerializer;
this.valSerializer = valSerializer;
}
public ProcessorNode build() {
return new SinkNode(name, topic, keySerializer, valSerializer);
}
}

public TopologyBuilder() {}

@SuppressWarnings("unchecked")
public final void addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, String... topics) {
if (nodeNames.contains(name))
throw new IllegalArgumentException("Processor " + name + " is already added.");

for (String topic : topics) {
if (topicsToSourceNames.containsKey(topic))
if (sourceTopicNames.contains(topic))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

topics are not added to sourceTopicNames.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

throw new IllegalArgumentException("Topic " + topic + " has already been registered by another processor.");

topicsToSourceNames.put(topic, name);
sourceTopicNames.add(topic);
}

sourceClasses.put(name, new SourceClazz(keyDeserializer, valDeserializer));
nodeNames.add(name);
nodeFactories.add(new SourceNodeFactory(name, topics, keyDeserializer, valDeserializer));
}

public final void addSink(String name, Serializer keySerializer, Serializer valSerializer, String... topics) {
for (String topic : topics) {
if (topicsToSinkNames.containsKey(topic))
throw new IllegalArgumentException("Topic " + topic + " has already been registered by another processor.");
public final void addSink(String name, String topic, Serializer keySerializer, Serializer valSerializer, String... parentNames) {
if (nodeNames.contains(name))
throw new IllegalArgumentException("Processor " + name + " is already added.");

topicsToSinkNames.put(topic, name);
if (parentNames != null) {
for (String parent : parentNames) {
if (parent.equals(name)) {
throw new IllegalArgumentException("Processor " + name + " cannot be a parent of itself");
}
if (!nodeNames.contains(parent)) {
throw new IllegalArgumentException("Parent processor " + parent + " is not added yet.");
}
}
}

sinkClasses.put(name, new SinkClazz(keySerializer, valSerializer));
nodeNames.add(name);
nodeFactories.add(new SinkNodeFactory(name, parentNames, topic, keySerializer, valSerializer));
}

public final void addProcessor(String name, ProcessorFactory factory, String... parentNames) {
if (processorClasses.containsKey(name))
if (nodeNames.contains(name))
throw new IllegalArgumentException("Processor " + name + " is already added.");

processorClasses.put(name, new ProcessorClazz(factory));

if (parentNames != null) {
for (String parent : parentNames) {
if (!processorClasses.containsKey(parent))
if (parent.equals(name)) {
throw new IllegalArgumentException("Processor " + name + " cannot be a parent of itself");
}
if (!nodeNames.contains(parent)) {
throw new IllegalArgumentException("Parent processor " + parent + " is not added yet.");

// add to parent list
if (!parents.containsKey(name))
parents.put(name, new ArrayList<>());
parents.get(name).add(parent);

// add to children list
if (!children.containsKey(parent))
children.put(parent, new ArrayList<>());
children.get(parent).add(name);
}
}
}

nodeNames.add(name);
nodeFactories.add(new ProcessorNodeFactory(name, parentNames, factory));
}

/**
* Build the topology by creating the processors
*/
@SuppressWarnings("unchecked")
public ProcessorTopology build() {
List<ProcessorNode> processorNodes = new ArrayList<>(nodeFactories.size());
Map<String, ProcessorNode> processorMap = new HashMap<>();
Map<String, SourceNode> topicSourceMap = new HashMap<>();
Map<String, SinkNode> topicSinkMap = new HashMap<>();

// create sources
for (String name : sourceClasses.keySet()) {
Deserializer keyDeserializer = sourceClasses.get(name).keyDeserializer;
Deserializer valDeserializer = sourceClasses.get(name).valDeserializer;
SourceNode node = new SourceNode(name, keyDeserializer, valDeserializer);
processorMap.put(name, node);
}

// create sinks
for (String name : sinkClasses.keySet()) {
Serializer keySerializer = sinkClasses.get(name).keySerializer;
Serializer valSerializer = sinkClasses.get(name).valSerializer;
SinkNode node = new SinkNode(name, keySerializer, valSerializer);
processorMap.put(name, node);
}

// create processors
try {
for (String name : processorClasses.keySet()) {
ProcessorFactory processorFactory = processorClasses.get(name).factory;
Processor processor = processorFactory.build();
ProcessorNode node = new ProcessorNode(name, processor);
processorMap.put(name, node);
// create processor nodes in a topological order ("nodeFactories" is already topologically sorted)
for (NodeFactory factory : nodeFactories) {
ProcessorNode node = factory.build();
processorNodes.add(node);
processorMap.put(node.name(), node);

if (factory instanceof ProcessorNodeFactory) {
for (String parent : ((ProcessorNodeFactory) factory).parents) {
processorMap.get(parent).chain(node);
}
} else if (factory instanceof SourceNodeFactory) {
for (String topic : ((SourceNodeFactory)factory).topics) {
topicSourceMap.put(topic, (SourceNode) node);
}
} else if (factory instanceof SinkNodeFactory) {
String topic = ((SinkNodeFactory) factory).topic;
topicSinkMap.put(topic, (SinkNode) node);

for (String parent : ((SinkNodeFactory) factory).parents) {
processorMap.get(parent).chain(node);
}
} else {
throw new IllegalStateException("unknown factory class: " + factory.getClass().getName());
}
}
} catch (Exception e) {
throw new KafkaException("Processor(String) constructor failed: this should not happen.");
}

// construct topics to sources map
for (String topic : topicsToSourceNames.keySet()) {
SourceNode node = (SourceNode) processorMap.get(topicsToSourceNames.get(topic));
topicSourceMap.put(topic, node);
}

// construct topics to sinks map
for (String topic : topicsToSinkNames.keySet()) {
SinkNode node = (SinkNode) processorMap.get(topicsToSourceNames.get(topic));
topicSinkMap.put(topic, node);
node.addTopic(topic);
}

// chain children to parents to build the DAG
for (ProcessorNode node : processorMap.values()) {
for (String child : children.get(node.name())) {
ProcessorNode childNode = processorMap.get(child);
node.chain(childNode);
}
throw new KafkaException("ProcessorNode construction failed: this should not happen.");
}

return new ProcessorTopology(processorMap, topicSourceMap, topicSinkMap);
return new ProcessorTopology(processorNodes, topicSourceMap, topicSinkMap);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ public class ProcessorNode<K1, V1, K2, V2> {
private final String name;
private final Processor<K1, V1> processor;

public boolean initialized;

public ProcessorNode(String name) {
this(name, null);
}
Expand All @@ -42,8 +40,6 @@ public ProcessorNode(String name, Processor<K1, V1> processor) {
this.processor = processor;
this.parents = new ArrayList<>();
this.children = new ArrayList<>();

this.initialized = false;
}

public String name() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,88 +27,60 @@

public class ProcessorTopology {

private Map<String, ProcessorNode> processors = new HashMap<>();
private Map<String, SourceNode> sourceTopics = new HashMap<>();
private Map<String, SinkNode> sinkTopics = new HashMap<>();
private List<ProcessorNode> processors;
private Map<String, SourceNode> sourceByTopics;
private Map<String, SinkNode> sinkByTopics;

public ProcessorTopology(Map<String, ProcessorNode> processors,
Map<String, SourceNode> sourceTopics,
Map<String, SinkNode> sinkTopics) {
public ProcessorTopology(List<ProcessorNode> processors,
Map<String, SourceNode> sourceByTopics,
Map<String, SinkNode> sinkByTopics) {
this.processors = processors;
this.sourceTopics = sourceTopics;
this.sinkTopics = sinkTopics;
this.sourceByTopics = sourceByTopics;
this.sinkByTopics = sinkByTopics;
}

public Set<String> sourceTopics() {
return sourceTopics.keySet();
return sourceByTopics.keySet();
}

public Set<String> sinkTopics() {
return sinkTopics.keySet();
return sinkByTopics.keySet();
}

public SourceNode source(String topic) {
return sourceTopics.get(topic);
return sourceByTopics.get(topic);
}

public SinkNode sink(String topic) {
return sinkTopics.get(topic);
return sinkByTopics.get(topic);
}

public Collection<SourceNode> sources() {
return sourceTopics.values();
return sourceByTopics.values();
}

public Collection<SinkNode> sinks() {
return sinkTopics.values();
return sinkByTopics.values();
}

/**
* Initialize the processors following the DAG reverse ordering

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Edit the comments accordingly.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is still correct.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not the DAG reverse ordering but just topological ordering right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may be missing something. What do you mean by DAG reverse ordering exactly?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, in an older version of the code I try to do initialization recursively by starting at sink nodes, and call parent.init() first before initializing itself, I later dropped that idea but did not change the comments. Will fix it in #9.

* such that parents are always initialized before children
*/
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
// initialize sources
for (String topic : sourceTopics.keySet()) {
SourceNode source = sourceTopics.get(topic);

init(source, context);
}
}

/**
* Initialize the current processor node by first initializing
* its parent nodes first, then the processor itself
*/
@SuppressWarnings("unchecked")
private void init(ProcessorNode node, ProcessorContext context) {
for (ProcessorNode parentNode : (List<ProcessorNode>) node.parents()) {
if (!parentNode.initialized) {
init(parentNode, context);
}
}

node.init(context);
node.initialized = true;

// try to initialize its children
for (ProcessorNode childNode : (List<ProcessorNode>) node.children()) {
if (!childNode.initialized) {
init(childNode, context);
}
for (ProcessorNode node : processors) {
node.init(context);
}
}

public final void close() {
// close the processors
// TODO: do we need to follow the DAG ordering?
for (ProcessorNode processorNode : processors.values()) {
processorNode.close();
for (ProcessorNode node : processors) {
node.close();
}

processors.clear();
sourceTopics.clear();
sinkTopics.clear();
sourceByTopics.clear();
sinkByTopics.clear();
}
}
Loading