Skip to content

Commit

Permalink
KAFKA-3856; Cleanup Kafka Stream builder API (KIP-120)
Browse files Browse the repository at this point in the history
Author: Matthias J. Sax <[email protected]>

Reviewers: Damian Guy <[email protected]>, Bill Bejeck <[email protected]>

Closes apache#2301 from mjsax/kafka-3856-topology-builder-API
  • Loading branch information
mjsax authored and dguy committed Jul 20, 2017
1 parent e0099e1 commit b04bed0
Show file tree
Hide file tree
Showing 4 changed files with 1,012 additions and 17 deletions.
4 changes: 2 additions & 2 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@

<!-- Streams -->
<suppress checks="ClassFanOutComplexity"
files="(KafkaStreams|KStreamImpl|KTableImpl|StreamThread|StreamTask).java"/>
files="(TopologyBuilder|KafkaStreams|KStreamImpl|KTableImpl|StreamThread|StreamTask).java"/>

<suppress checks="MethodLength"
files="StreamPartitionAssignor.java"/>
Expand All @@ -128,7 +128,7 @@
files="RocksDBWindowStoreSupplier.java"/>

<suppress checks="ClassDataAbstractionCoupling"
files="(KStreamImpl|StreamPartitionAssignor|KafkaStreams|KTableImpl).java"/>
files="(TopologyBuilder|KStreamImpl|StreamPartitionAssignor|KafkaStreams|KTableImpl).java"/>

<suppress checks="CyclomaticComplexity"
files="TopologyBuilder.java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,23 +144,25 @@ private static class StateStoreFactory {
}

private static abstract class NodeFactory {
public final String name;
final String name;
final String[] parents;

NodeFactory(String name) {
NodeFactory(final String name, final String[] parents) {
this.name = name;
this.parents = parents;
}

public abstract ProcessorNode build();

abstract TopologyDescription.AbstractNode describe();
}

private static class ProcessorNodeFactory extends NodeFactory {
private final String[] parents;
private final ProcessorSupplier<?, ?> supplier;
private final Set<String> stateStoreNames = new HashSet<>();

ProcessorNodeFactory(String name, String[] parents, ProcessorSupplier<?, ?> supplier) {
super(name);
this.parents = parents.clone();
super(name, parents.clone());
this.supplier = supplier;
}

Expand All @@ -172,6 +174,11 @@ public void addStateStore(String stateStoreName) {
public ProcessorNode build() {
return new ProcessorNode<>(name, supplier.get(), stateStoreNames);
}

@Override
TopologyDescription.Processor describe() {
return new TopologyDescription.Processor(name, new HashSet<>(stateStoreNames));
}
}

private class SourceNodeFactory extends NodeFactory {
Expand All @@ -187,7 +194,7 @@ private SourceNodeFactory(final String name,
final TimestampExtractor timestampExtractor,
final Deserializer<?> keyDeserializer,
final Deserializer<?> valDeserializer) {
super(name);
super(name, new String[0]);
this.topics = topics != null ? Arrays.asList(topics) : new ArrayList<String>();
this.pattern = pattern;
this.keyDeserializer = keyDeserializer;
Expand Down Expand Up @@ -236,18 +243,30 @@ public ProcessorNode build() {
private boolean isMatch(String topic) {
return this.pattern.matcher(topic).matches();
}

@Override
TopologyDescription.Source describe() {
String sourceTopics;

if (pattern == null) {
sourceTopics = topics.toString();
sourceTopics = sourceTopics.substring(1, sourceTopics.length() - 1); // trim first and last, ie. []
} else {
sourceTopics = pattern.toString();
}

return new TopologyDescription.Source(name, sourceTopics);
}
}

private class SinkNodeFactory<K, V> extends NodeFactory {
private final String[] parents;
private final String topic;
private final Serializer<K> keySerializer;
private final Serializer<V> valSerializer;
private final StreamPartitioner<? super K, ? super V> partitioner;

private SinkNodeFactory(String name, String[] parents, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<? super K, ? super V> partitioner) {
super(name);
this.parents = parents.clone();
super(name, parents.clone());
this.topic = topic;
this.keySerializer = keySerializer;
this.valSerializer = valSerializer;
Expand All @@ -263,6 +282,11 @@ public ProcessorNode build() {
return new SinkNode<>(name, topic, keySerializer, valSerializer, partitioner);
}
}

@Override
TopologyDescription.Sink describe() {
return new TopologyDescription.Sink(name, topic);
}
}

public static class TopicsInfo {
Expand Down Expand Up @@ -1196,12 +1220,8 @@ private Set<String> globalNodeGroups() {
for (final Map.Entry<Integer, Set<String>> nodeGroup : nodeGroups().entrySet()) {
final Set<String> nodes = nodeGroup.getValue();
for (String node : nodes) {
final NodeFactory nodeFactory = nodeFactories.get(node);
if (nodeFactory instanceof SourceNodeFactory) {
final List<String> topics = ((SourceNodeFactory) nodeFactory).topics;
if (topics != null && topics.size() == 1 && globalTopics.contains(topics.get(0))) {
globalGroups.addAll(nodes);
}
if (isGlobalSource(node)) {
globalGroups.addAll(nodes);
}
}
}
Expand Down Expand Up @@ -1558,4 +1578,98 @@ public synchronized void updateSubscriptions(final SubscriptionUpdates subscript
setRegexMatchedTopicsToSourceNodes();
setRegexMatchedTopicToStateStore();
}

private boolean isGlobalSource(final String nodeName) {
final NodeFactory nodeFactory = nodeFactories.get(nodeName);

if (nodeFactory instanceof SourceNodeFactory) {
final List<String> topics = ((SourceNodeFactory) nodeFactory).topics;
if (topics != null && topics.size() == 1 && globalTopics.contains(topics.get(0))) {
return true;
}
}

return false;
}

TopologyDescription describe() {
final TopologyDescription description = new TopologyDescription();

describeSubtopologies(description);
describeGlobalStores(description);

return description;
}

private void describeSubtopologies(final TopologyDescription description) {
for (final Map.Entry<Integer, Set<String>> nodeGroup : makeNodeGroups().entrySet()) {

final Set<String> allNodesOfGroups = nodeGroup.getValue();
final boolean isNodeGroupOfGlobalStores = nodeGroupContainsGlobalSourceNode(allNodesOfGroups);

if (!isNodeGroupOfGlobalStores) {
describeSubtopology(description, nodeGroup.getKey(), allNodesOfGroups);
}
}
}

private boolean nodeGroupContainsGlobalSourceNode(final Set<String> allNodesOfGroups) {
for (final String node : allNodesOfGroups) {
if (isGlobalSource(node)) {
return true;
}
}
return false;
}

private void describeSubtopology(final TopologyDescription description,
final Integer subtopologyId,
final Set<String> nodeNames) {

final HashMap<String, TopologyDescription.AbstractNode> nodesByName = new HashMap<>();

// add all nodes
for (final String nodeName : nodeNames) {
nodesByName.put(nodeName, nodeFactories.get(nodeName).describe());
}

// connect each node to its predecessors and successors
for (final TopologyDescription.AbstractNode node : nodesByName.values()) {
for (final String predecessorName : nodeFactories.get(node.name()).parents) {
final TopologyDescription.AbstractNode predecessor = nodesByName.get(predecessorName);
node.addPredecessor(predecessor);
predecessor.addSuccessor(node);
}
}

description.addSubtopology(new TopologyDescription.Subtopology(
subtopologyId,
new HashSet<TopologyDescription.Node>(nodesByName.values())));
}

private void describeGlobalStores(final TopologyDescription description) {
for (final Map.Entry<Integer, Set<String>> nodeGroup : makeNodeGroups().entrySet()) {
final Set<String> nodes = nodeGroup.getValue();

final Iterator<String> it = nodes.iterator();
while (it.hasNext()) {
final String node = it.next();

if (isGlobalSource(node)) {
// we found a GlobalStore node group; those contain exactly two node: {sourceNode,processorNode}
it.remove(); // remove sourceNode from group
final String processorNode = nodes.iterator().next(); // get remaining processorNode

description.addGlobalStore(new TopologyDescription.GlobalStore(
node,
processorNode,
((ProcessorNodeFactory) nodeFactories.get(processorNode)).stateStoreNames.iterator().next(),
nodeToSourceTopics.get(node).get(0)
));
break;
}
}
}
}

}
Loading

0 comments on commit b04bed0

Please sign in to comment.