Skip to content

Commit 8ef3a24

Browse files
authored
Change KafkaIO to default to offset-based deduplication when redistribute is enabled for Dataflow java runner. (#36849)
* Add kafka read override to Dataflow java runner.
1 parent e336419 commit 8ef3a24

File tree

4 files changed

+214
-0
lines changed

4 files changed

+214
-0
lines changed

runners/google-cloud-dataflow-java/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,8 @@ dependencies {
129129
testImplementation library.java.google_cloud_dataflow_java_proto_library_all
130130
testImplementation library.java.jackson_dataformat_yaml
131131
testImplementation library.java.mockito_inline
132+
testImplementation project(":sdks:java:io:kafka")
133+
testImplementation library.java.kafka_clients
132134
validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest")
133135
validatesRunner project(path: project.path, configuration: "testRuntimeMigration")
134136
validatesRunner library.java.hamcrest

runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -659,6 +659,10 @@ private List<PTransformOverride> getOverrides(boolean streaming) {
659659

660660
try {
661661
overridesBuilder.add(KafkaIO.Read.KAFKA_READ_OVERRIDE);
662+
overridesBuilder.add(
663+
PTransformOverride.of(
664+
KafkaReadWithRedistributeOverride.matcher(),
665+
new KafkaReadWithRedistributeOverride.Factory()));
662666
} catch (NoClassDefFoundError e) {
663667
// Do nothing. io-kafka is an optional dependency of runners-google-cloud-dataflow-java
664668
// and only needed when KafkaIO is used in the pipeline.
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.runners.dataflow;
19+
20+
import java.util.Map;
21+
import org.apache.beam.sdk.io.kafka.KafkaIO;
22+
import org.apache.beam.sdk.io.kafka.KafkaRecord;
23+
import org.apache.beam.sdk.runners.AppliedPTransform;
24+
import org.apache.beam.sdk.runners.PTransformMatcher;
25+
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
26+
import org.apache.beam.sdk.util.construction.ReplacementOutputs;
27+
import org.apache.beam.sdk.values.PBegin;
28+
import org.apache.beam.sdk.values.PCollection;
29+
import org.apache.beam.sdk.values.TupleTag;
30+
31+
public final class KafkaReadWithRedistributeOverride {
32+
33+
private KafkaReadWithRedistributeOverride() {}
34+
35+
public static PTransformMatcher matcher() {
36+
return new PTransformMatcher() {
37+
@SuppressWarnings({
38+
"PatternMatchingInstanceof" // For compiling on older Java versions.
39+
})
40+
@Override
41+
public boolean matches(AppliedPTransform<?, ?, ?> application) {
42+
if (application.getTransform() instanceof KafkaIO.Read) {
43+
return ((KafkaIO.Read) application.getTransform()).isRedistributed();
44+
}
45+
return false;
46+
}
47+
};
48+
}
49+
50+
/**
51+
* {@link PTransformOverrideFactory} for {@link KafkaIO.Read} that enables {@code
52+
* withOffsetDeduplication} when {@code withRedistribute} is enabled.
53+
*/
54+
static class Factory<K, V>
55+
implements PTransformOverrideFactory<
56+
PBegin, PCollection<KafkaRecord<K, V>>, KafkaIO.Read<K, V>> {
57+
58+
@Override
59+
public PTransformReplacement<PBegin, PCollection<KafkaRecord<K, V>>> getReplacementTransform(
60+
AppliedPTransform<PBegin, PCollection<KafkaRecord<K, V>>, KafkaIO.Read<K, V>> transform) {
61+
KafkaIO.Read<K, V> read = transform.getTransform();
62+
if (read.getOffsetDeduplication() == null) {
63+
return PTransformReplacement.of(
64+
transform.getPipeline().begin(), read.withOffsetDeduplication(true));
65+
}
66+
return PTransformReplacement.of(transform.getPipeline().begin(), read);
67+
}
68+
69+
@Override
70+
public Map<PCollection<?>, ReplacementOutput> mapOutputs(
71+
Map<TupleTag<?>, PCollection<?>> outputs, PCollection<KafkaRecord<K, V>> newOutput) {
72+
return ReplacementOutputs.singleton(outputs, newOutput);
73+
}
74+
}
75+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.runners.dataflow;
19+
20+
import static org.hamcrest.MatcherAssert.assertThat;
21+
import static org.hamcrest.Matchers.nullValue;
22+
import static org.junit.Assert.assertFalse;
23+
import static org.junit.Assert.assertTrue;
24+
25+
import java.io.Serializable;
26+
import java.util.Collections;
27+
import org.apache.beam.sdk.Pipeline;
28+
import org.apache.beam.sdk.io.kafka.KafkaIO;
29+
import org.apache.beam.sdk.runners.PTransformOverride;
30+
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
31+
import org.apache.beam.sdk.testing.TestPipeline;
32+
import org.apache.kafka.common.serialization.StringDeserializer;
33+
import org.junit.Rule;
34+
import org.junit.Test;
35+
import org.junit.runner.RunWith;
36+
import org.junit.runners.JUnit4;
37+
38+
@RunWith(JUnit4.class)
39+
public class KafkaReadWithRedistributeOverrideTest implements Serializable {
40+
@Rule public transient TestPipeline p = TestPipeline.create();
41+
42+
@Test
43+
public void testOverrideAppliedWhenRedistributeEnabled() {
44+
p.apply(
45+
"MatchingRead",
46+
KafkaIO.<String, String>read()
47+
.withBootstrapServers("localhost:9092")
48+
.withTopic("test_match")
49+
.withKeyDeserializer(StringDeserializer.class)
50+
.withValueDeserializer(StringDeserializer.class)
51+
.withRedistribute());
52+
p.apply(
53+
"NoRedistribute",
54+
KafkaIO.<String, String>read()
55+
.withBootstrapServers("localhost:9092")
56+
.withTopic("test_no_redistribute")
57+
.withKeyDeserializer(StringDeserializer.class)
58+
.withValueDeserializer(StringDeserializer.class));
59+
p.apply(
60+
"ExplicitlyDisable",
61+
KafkaIO.<String, String>read()
62+
.withBootstrapServers("localhost:9092")
63+
.withTopic("test_disabled")
64+
.withKeyDeserializer(StringDeserializer.class)
65+
.withValueDeserializer(StringDeserializer.class)
66+
.withOffsetDeduplication(false));
67+
p.apply(
68+
"ExplicitlyEnable",
69+
KafkaIO.<String, String>read()
70+
.withBootstrapServers("localhost:9092")
71+
.withTopic("test_enabled")
72+
.withKeyDeserializer(StringDeserializer.class)
73+
.withValueDeserializer(StringDeserializer.class)
74+
.withRedistribute()
75+
.withOffsetDeduplication(true));
76+
77+
p.replaceAll(
78+
Collections.singletonList(
79+
PTransformOverride.of(
80+
KafkaReadWithRedistributeOverride.matcher(),
81+
new KafkaReadWithRedistributeOverride.Factory<>())));
82+
83+
Pipeline.PipelineVisitor visitor =
84+
new Pipeline.PipelineVisitor.Defaults() {
85+
86+
private boolean matchingVisited = false;
87+
private boolean noRedistributeVisited = false;
88+
private boolean explicitlyDisabledVisited = false;
89+
private boolean explicitlyEnabledVisited = false;
90+
91+
@Override
92+
public CompositeBehavior enterCompositeTransform(Node node) {
93+
if (node.getTransform() instanceof KafkaIO.Read) {
94+
KafkaIO.Read<?, ?> read = (KafkaIO.Read<?, ?>) node.getTransform();
95+
if (read.getTopics().contains("test_match")) {
96+
assertTrue(read.isRedistributed());
97+
assertTrue(read.getOffsetDeduplication());
98+
assertFalse(matchingVisited);
99+
matchingVisited = true;
100+
} else if (read.getTopics().contains("test_no_redistribute")) {
101+
assertFalse(read.isRedistributed());
102+
assertThat(read.getOffsetDeduplication(), nullValue());
103+
assertFalse(noRedistributeVisited);
104+
noRedistributeVisited = true;
105+
} else if (read.getTopics().contains("test_disabled")) {
106+
assertFalse(read.isRedistributed());
107+
assertFalse(read.getOffsetDeduplication());
108+
assertFalse(explicitlyDisabledVisited);
109+
explicitlyDisabledVisited = true;
110+
} else if (read.getTopics().contains("test_enabled")) {
111+
assertTrue(read.isRedistributed());
112+
assertTrue(read.getOffsetDeduplication());
113+
assertFalse(explicitlyEnabledVisited);
114+
explicitlyEnabledVisited = true;
115+
}
116+
}
117+
return CompositeBehavior.ENTER_TRANSFORM;
118+
}
119+
120+
@Override
121+
public void leaveCompositeTransform(Node node) {
122+
if (node.isRootNode()) {
123+
assertTrue("Matching transform was not visited", matchingVisited);
124+
assertTrue("No redistribute transform was not visited", noRedistributeVisited);
125+
assertTrue(
126+
"Explicitly disabled transform was not visited", explicitlyDisabledVisited);
127+
assertTrue("Explicitly enabled transform was not visited", explicitlyEnabledVisited);
128+
}
129+
}
130+
};
131+
p.traverseTopologically(visitor);
132+
}
133+
}

0 commit comments

Comments
 (0)