Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,10 @@ private List<PTransformOverride> getOverrides(boolean streaming) {
PTransformMatchers.groupWithShardableStates(),
new GroupIntoBatchesOverride.StreamingGroupIntoBatchesWithShardedKeyOverrideFactory(
this)));
overridesBuilder.add(
PTransformOverride.of(
KafkaIO.Read.KEYED_BY_PARTITION_MATCHER,
new KeyedByPartitionOverride.StreamingKeyedByPartitionOverrideFactory(this)));

overridesBuilder
.add(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow;

import java.util.Map;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.construction.PTransformReplacements;
import org.apache.beam.sdk.util.construction.ReplacementOutputs;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;

@SuppressWarnings({
"rawtypes" // TODO(https://github.com/apache/beam/issues/20447)
})
public class KeyedByPartitionOverride {

static class StreamingKeyedByPartitionOverrideFactory<K, V>
implements PTransformOverrideFactory<
PCollection<KafkaRecord<K, V>>,
PCollection<KV<Integer, V>>,
KafkaIO.Read.KeyedByPartition<K, V>> {

private final DataflowRunner runner;

StreamingKeyedByPartitionOverrideFactory(DataflowRunner runner) {
this.runner = runner;
}

@Override
public PTransformReplacement<PCollection<KafkaRecord<K, V>>, PCollection<KV<Integer, V>>>
getReplacementTransform(
AppliedPTransform<
PCollection<KafkaRecord<K, V>>,
PCollection<KV<Integer, V>>,
KafkaIO.Read.KeyedByPartition<K, V>>
transform) {
return PTransformReplacement.of(
PTransformReplacements.getSingletonMainInput(transform),
new StreamingKeyedByPartition<>(
runner,
transform.getTransform(),
PTransformReplacements.getSingletonMainOutput(transform)));
}

@Override
public Map<PCollection<?>, ReplacementOutput> mapOutputs(
Map<TupleTag<?>, PCollection<?>> outputs, PCollection<KV<Integer, V>> newOutput) {
return ReplacementOutputs.singleton(outputs, newOutput);
}
}

static class StreamingKeyedByPartition<K, V>
extends PTransform<PCollection<KafkaRecord<K, V>>, PCollection<KV<Integer, V>>> {

private final transient DataflowRunner runner;
private final KafkaIO.Read.KeyedByPartition<K, V> originalTransform;
private final transient PCollection<KV<Integer, V>> originalOutput;

public StreamingKeyedByPartition(
DataflowRunner runner,
KafkaIO.Read.KeyedByPartition<K, V> original,
PCollection<KV<Integer, V>> output) {
this.runner = runner;
this.originalTransform = original;
this.originalOutput = output;
}

@Override
public PCollection<KV<Integer, V>> expand(PCollection<KafkaRecord<K, V>> input) {
// Record the output PCollection of the original transform since the new output will be
// replaced by the original one when the replacement transform is wired to other nodes in the
// graph, although the old and the new outputs are effectively the same.
runner.maybeRecordPCollectionPreservedKeys(originalOutput);
System.out.println("StreamingKeyedByPartition override");
return input.apply(originalTransform);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformMatcher;
import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
Expand Down Expand Up @@ -1575,6 +1576,10 @@ public PTransform<PBegin, PCollection<KV<K, V>>> withoutMetadata() {
return new TypedWithoutMetadata<>(this);
}

public PTransform<PBegin, PCollection<KV<Integer, V>>> keyedByPartition() {
return new ValuesKeyedByPartition<>(this);
}

public PTransform<PBegin, PCollection<Row>> externalWithMetadata() {
return new RowsWithMetadata<>(this);
}
Expand Down Expand Up @@ -1818,6 +1823,28 @@ public Map<PCollection<?>, ReplacementOutput> mapOutputs(
}
}

@Internal
public static final PTransformMatcher KEYED_BY_PARTITION_MATCHER =
PTransformMatchers.classEqualTo(KeyedByPartition.class);

public static class KeyedByPartition<K, V>
extends PTransform<PCollection<KafkaRecord<K, V>>, PCollection<KV<Integer, V>>> {

@Override
public PCollection<KV<Integer, V>> expand(PCollection<KafkaRecord<K, V>> input) {
return input.apply(
"Repartition",
ParDo.of(
new DoFn<KafkaRecord<K, V>, KV<Integer, V>>() {
@ProcessElement
public void processElement(ProcessContext ctx) {
ctx.output(
KV.of(ctx.element().getPartition(), ctx.element().getKV().getValue()));
}
}));
}
}

private abstract static class AbstractReadFromKafka<K, V>
extends PTransform<PBegin, PCollection<KafkaRecord<K, V>>> {
Read<K, V> kafkaRead;
Expand Down Expand Up @@ -2170,6 +2197,41 @@ public void populateDisplayData(DisplayData.Builder builder) {
}
}

public static class ValuesKeyedByPartition<K, V>
extends PTransform<PBegin, PCollection<KV<Integer, V>>> {
private final Read<K, V> read;

ValuesKeyedByPartition(Read<K, V> read) {
super("KafkaIO.Read");
this.read = read;
}

static class Builder<K, V>
implements ExternalTransformBuilder<
Read.External.Configuration, PBegin, PCollection<KV<Integer, V>>> {

@Override
public PTransform<PBegin, PCollection<KV<Integer, V>>> buildExternal(
Read.External.Configuration config) {
Read.Builder<K, V> readBuilder = new AutoValue_KafkaIO_Read.Builder<>();
Read.Builder.setupExternalBuilder(readBuilder, config);

return readBuilder.build().keyedByPartition();
}
}

@Override
public PCollection<KV<Integer, V>> expand(PBegin begin) {
return begin.apply(read).apply("KeyedByPartition", new Read.KeyedByPartition<>());
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
read.populateDisplayData(builder);
}
}

/**
* A {@link PTransform} to read from Kafka topics. Similar to {@link KafkaIO.Read}, but removes
* Kafka metatdata and returns a {@link PCollection} of {@link KV}. See {@link KafkaIO} for more
Expand Down
Loading