Skip to content

Commit eecb99d

Browse files
committed
Add OutputBuilder to the Java SDK and use in runners
1 parent 08947d1 commit eecb99d

File tree

49 files changed

+1190
-683
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+1190
-683
lines changed

buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1194,6 +1194,7 @@ class BeamModulePlugin implements Plugin<Project> {
11941194

11951195
List<String> skipDefRegexes = []
11961196
skipDefRegexes << "AutoValue_.*"
1197+
skipDefRegexes << "AutoBuilder_.*"
11971198
skipDefRegexes << "AutoOneOf_.*"
11981199
skipDefRegexes << ".*\\.jmh_generated\\..*"
11991200
skipDefRegexes += configuration.generatedClassPatterns
@@ -1287,7 +1288,8 @@ class BeamModulePlugin implements Plugin<Project> {
12871288
'**/org/apache/beam/gradle/**',
12881289
'**/org/apache/beam/model/**',
12891290
'**/org/apache/beam/runners/dataflow/worker/windmill/**',
1290-
'**/AutoValue_*'
1291+
'**/AutoValue_*',
1292+
'**/AutoBuilder_*',
12911293
]
12921294

12931295
def jacocoEnabled = project.hasProperty('enableJacocoReport')

runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,9 @@ public static <K, V> Iterable<WindowedValue<V>> dropExpiredWindows(
8181
if (input == null) {
8282
return null;
8383
}
84-
return input.explodeWindows();
84+
// The generics in this chain of calls line up best if we drop the covariance
85+
// in the return value of explodeWindows()
86+
return (Iterable<WindowedValue<V>>) input.explodeWindows();
8587
})
8688
.filter(
8789
input -> {

runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
4646
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
4747
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
48+
import org.apache.beam.sdk.util.OutputBuilderSuppliers;
4849
import org.apache.beam.sdk.util.WindowedValueMultiReceiver;
4950
import org.apache.beam.sdk.values.KV;
5051
import org.apache.beam.sdk.values.PCollectionView;
@@ -180,7 +181,8 @@ public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) {
180181

181182
@Override
182183
public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {
183-
return DoFnOutputReceivers.windowedReceiver(processContext, null);
184+
return DoFnOutputReceivers.windowedReceiver(
185+
processContext, OutputBuilderSuppliers.supplierForElement(element), null);
184186
}
185187

186188
@Override
@@ -190,7 +192,8 @@ public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) {
190192

191193
@Override
192194
public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> doFn) {
193-
return DoFnOutputReceivers.windowedMultiReceiver(processContext, null);
195+
return DoFnOutputReceivers.windowedMultiReceiver(
196+
processContext, OutputBuilderSuppliers.supplierForElement(element));
194197
}
195198

196199
@Override
@@ -385,12 +388,12 @@ public PaneInfo pane() {
385388

386389
@Override
387390
public String currentRecordId() {
388-
return element.getCurrentRecordId();
391+
return element.getRecordId();
389392
}
390393

391394
@Override
392395
public Long currentRecordOffset() {
393-
return element.getCurrentRecordOffset();
396+
return element.getRecordOffset();
394397
}
395398

396399
@Override

runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1057,8 +1057,13 @@ private void prefetchOnTrigger(
10571057
}
10581058

10591059
// Output the actual value.
1060-
outputter.output(
1061-
WindowedValues.of(KV.of(key, toOutput), outputTimestamp, windows, paneInfo));
1060+
WindowedValues.<KV<K, OutputT>>builder()
1061+
.setValue(KV.of(key, toOutput))
1062+
.setTimestamp(outputTimestamp)
1063+
.setWindows(windows)
1064+
.setPaneInfo(paneInfo)
1065+
.setReceiver(outputter)
1066+
.output();
10621067
});
10631068

10641069
reduceFn.onTrigger(renamedTriggerContext);

runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java

Lines changed: 64 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@
5151
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
5252
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
5353
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
54+
import org.apache.beam.sdk.util.OutputBuilderSupplier;
55+
import org.apache.beam.sdk.util.OutputBuilderSuppliers;
5456
import org.apache.beam.sdk.util.SystemDoFnInternal;
5557
import org.apache.beam.sdk.util.UserCodeException;
5658
import org.apache.beam.sdk.util.WindowedValueMultiReceiver;
@@ -113,7 +115,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
113115

114116
final @Nullable SchemaCoder<OutputT> mainOutputSchemaCoder;
115117

116-
private @Nullable Map<TupleTag<?>, Coder<?>> outputCoders;
118+
private final @Nullable Map<TupleTag<?>, Coder<?>> outputCoders;
117119

118120
private final @Nullable DoFnSchemaInformation doFnSchemaInformation;
119121

@@ -395,6 +397,8 @@ private class DoFnProcessContext extends DoFn<InputT, OutputT>.ProcessContext
395397
/** Lazily initialized; should only be accessed via {@link #getNamespace()}. */
396398
private @Nullable StateNamespace namespace;
397399

400+
private final OutputBuilderSupplier builderSupplier;
401+
398402
/**
399403
* The state namespace for this context.
400404
*
@@ -412,6 +416,7 @@ private StateNamespace getNamespace() {
412416
private DoFnProcessContext(WindowedValue<InputT> elem) {
413417
fn.super();
414418
this.elem = elem;
419+
this.builderSupplier = OutputBuilderSuppliers.supplierForElement(elem);
415420
}
416421

417422
@Override
@@ -494,8 +499,17 @@ public <T> void outputWindowedValue(
494499
Instant timestamp,
495500
Collection<? extends BoundedWindow> windows,
496501
PaneInfo paneInfo) {
497-
SimpleDoFnRunner.this.outputWindowedValue(
498-
tag, WindowedValues.of(output, timestamp, windows, paneInfo));
502+
builderSupplier
503+
.builder(output)
504+
.setTimestamp(timestamp)
505+
.setWindows(windows)
506+
.setPaneInfo(paneInfo)
507+
.setReceiver(
508+
wv -> {
509+
checkTimestamp(elem.getTimestamp(), wv.getTimestamp());
510+
SimpleDoFnRunner.this.outputWindowedValue(tag, wv);
511+
})
512+
.output();
499513
}
500514

501515
@Override
@@ -520,12 +534,12 @@ public Instant timestamp() {
520534

521535
@Override
522536
public String currentRecordId() {
523-
return elem.getCurrentRecordId();
537+
return elem.getRecordId();
524538
}
525539

526540
@Override
527541
public Long currentRecordOffset() {
528-
return elem.getCurrentRecordOffset();
542+
return elem.getRecordOffset();
529543
}
530544

531545
public Collection<? extends BoundedWindow> windows() {
@@ -604,17 +618,18 @@ public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) {
604618

605619
@Override
606620
public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {
607-
return DoFnOutputReceivers.windowedReceiver(this, mainOutputTag);
621+
return DoFnOutputReceivers.windowedReceiver(this, builderSupplier, mainOutputTag);
608622
}
609623

610624
@Override
611625
public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) {
612-
return DoFnOutputReceivers.rowReceiver(this, mainOutputTag, mainOutputSchemaCoder);
626+
return DoFnOutputReceivers.rowReceiver(
627+
this, builderSupplier, mainOutputTag, mainOutputSchemaCoder);
613628
}
614629

615630
@Override
616631
public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> doFn) {
617-
return DoFnOutputReceivers.windowedMultiReceiver(this, outputCoders);
632+
return DoFnOutputReceivers.windowedMultiReceiver(this, builderSupplier, outputCoders);
618633
}
619634

620635
@Override
@@ -710,6 +725,7 @@ private class OnTimerArgumentProvider<KeyT> extends DoFn<InputT, OutputT>.OnTime
710725
private final TimeDomain timeDomain;
711726
private final String timerId;
712727
private final KeyT key;
728+
private final OutputBuilderSupplier builderSupplier;
713729

714730
/** Lazily initialized; should only be accessed via {@link #getNamespace()}. */
715731
private @Nullable StateNamespace namespace;
@@ -742,6 +758,13 @@ private OnTimerArgumentProvider(
742758
this.timestamp = timestamp;
743759
this.timeDomain = timeDomain;
744760
this.key = key;
761+
this.builderSupplier =
762+
OutputBuilderSuppliers.supplierForElement(
763+
WindowedValues.builder()
764+
.setValue(null)
765+
.setTimestamp(timestamp)
766+
.setWindow(window)
767+
.setPaneInfo(PaneInfo.NO_FIRING));
745768
}
746769

747770
@Override
@@ -828,17 +851,19 @@ public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) {
828851

829852
@Override
830853
public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {
831-
return DoFnOutputReceivers.windowedReceiver(this, mainOutputTag);
854+
return DoFnOutputReceivers.windowedReceiver(this, builderSupplier, mainOutputTag);
832855
}
833856

834857
@Override
835858
public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) {
836-
return DoFnOutputReceivers.rowReceiver(this, mainOutputTag, mainOutputSchemaCoder);
859+
return DoFnOutputReceivers.rowReceiver(
860+
this, builderSupplier, mainOutputTag, mainOutputSchemaCoder);
837861
}
838862

839863
@Override
840864
public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> doFn) {
841-
return DoFnOutputReceivers.windowedMultiReceiver(this, outputCoders);
865+
// ... what to doooo 0...
866+
return DoFnOutputReceivers.windowedMultiReceiver(this, builderSupplier, outputCoders);
842867
}
843868

844869
@Override
@@ -978,8 +1003,14 @@ public <T> void outputWindowedValue(
9781003
Collection<? extends BoundedWindow> windows,
9791004
PaneInfo paneInfo) {
9801005
checkTimestamp(timestamp(), timestamp);
981-
SimpleDoFnRunner.this.outputWindowedValue(
982-
tag, WindowedValues.of(output, timestamp, windows, paneInfo));
1006+
1007+
builderSupplier
1008+
.builder(output)
1009+
.setTimestamp(timestamp)
1010+
.setWindows(windows)
1011+
.setPaneInfo(paneInfo)
1012+
.setReceiver(wv -> SimpleDoFnRunner.this.outputWindowedValue(tag, wv))
1013+
.output();
9831014
}
9841015

9851016
@Override
@@ -1015,6 +1046,8 @@ private class OnWindowExpirationArgumentProvider<KeyT>
10151046
private final BoundedWindow window;
10161047
private final Instant timestamp;
10171048
private final KeyT key;
1049+
private final OutputBuilderSupplier builderSupplier;
1050+
10181051
/** Lazily initialized; should only be accessed via {@link #getNamespace()}. */
10191052
private @Nullable StateNamespace namespace;
10201053

@@ -1037,6 +1070,13 @@ private OnWindowExpirationArgumentProvider(BoundedWindow window, Instant timesta
10371070
this.window = window;
10381071
this.timestamp = timestamp;
10391072
this.key = key;
1073+
this.builderSupplier =
1074+
OutputBuilderSuppliers.supplierForElement(
1075+
WindowedValues.<Void>builder()
1076+
.setValue(null)
1077+
.setWindow(window)
1078+
.setTimestamp(timestamp)
1079+
.setPaneInfo(PaneInfo.NO_FIRING));
10401080
}
10411081

10421082
@Override
@@ -1109,17 +1149,18 @@ public KeyT key() {
11091149

11101150
@Override
11111151
public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {
1112-
return DoFnOutputReceivers.windowedReceiver(this, mainOutputTag);
1152+
return DoFnOutputReceivers.windowedReceiver(this, builderSupplier, mainOutputTag);
11131153
}
11141154

11151155
@Override
11161156
public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) {
1117-
return DoFnOutputReceivers.rowReceiver(this, mainOutputTag, mainOutputSchemaCoder);
1157+
return DoFnOutputReceivers.rowReceiver(
1158+
this, builderSupplier, mainOutputTag, mainOutputSchemaCoder);
11181159
}
11191160

11201161
@Override
11211162
public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> doFn) {
1122-
return DoFnOutputReceivers.windowedMultiReceiver(this, outputCoders);
1163+
return DoFnOutputReceivers.windowedMultiReceiver(this, builderSupplier, outputCoders);
11231164
}
11241165

11251166
@Override
@@ -1241,8 +1282,13 @@ public <T> void outputWindowedValue(
12411282
Collection<? extends BoundedWindow> windows,
12421283
PaneInfo paneInfo) {
12431284
checkTimestamp(this.timestamp, timestamp);
1244-
SimpleDoFnRunner.this.outputWindowedValue(
1245-
tag, WindowedValues.of(output, timestamp, windows, paneInfo));
1285+
builderSupplier
1286+
.builder(output)
1287+
.setTimestamp(timestamp)
1288+
.setWindows(windows)
1289+
.setPaneInfo(paneInfo)
1290+
.setReceiver(wv -> SimpleDoFnRunner.this.outputWindowedValue(tag, wv))
1291+
.output();
12461292
}
12471293

12481294
@Override

runners/core-java/src/main/java/org/apache/beam/runners/core/WindowMatchers.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,15 @@ public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
103103
Matchers.equalTo(value), Matchers.equalTo(timestamp), Matchers.equalTo(window));
104104
}
105105

106+
public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
107+
T value, BoundedWindow window) {
108+
return WindowMatchers.isSingleWindowedValue(
109+
Matchers.equalTo(value),
110+
Matchers.anything(),
111+
Matchers.equalTo(window),
112+
Matchers.anything());
113+
}
114+
106115
public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
107116
Matcher<T> valueMatcher, long timestamp, long windowStart, long windowEnd) {
108117
IntervalWindow intervalWindow =
@@ -170,6 +179,10 @@ protected void describeMismatchSafely(
170179
};
171180
}
172181

182+
public static <T> Matcher<WindowedValue<? extends T>> isValueInGlobalWindow(T value) {
183+
return isSingleWindowedValue(value, GlobalWindow.INSTANCE);
184+
}
185+
173186
public static <T> Matcher<WindowedValue<? extends T>> isValueInGlobalWindow(
174187
T value, Instant timestamp) {
175188
return isSingleWindowedValue(value, timestamp, GlobalWindow.INSTANCE);

runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchersTest.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import static org.hamcrest.MatcherAssert.assertThat;
2121

22+
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
2223
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
2324
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
2425
import org.apache.beam.sdk.values.WindowedValues;
@@ -75,4 +76,29 @@ public void testIsWindowedValueReorderedWindows() {
7576
new IntervalWindow(new Instant(windowStart2), new Instant(windowEnd2))),
7677
PaneInfo.NO_FIRING));
7778
}
79+
80+
@Test
81+
public void test_IsValueInGlobalWindow_TimestampedValueInGlobalWindow() {
82+
assertThat(
83+
WindowedValues.timestampedValueInGlobalWindow("foo", new Instant(7)),
84+
WindowMatchers.isValueInGlobalWindow("foo", new Instant(7)));
85+
86+
assertThat(
87+
WindowedValues.timestampedValueInGlobalWindow("foo", BoundedWindow.TIMESTAMP_MIN_VALUE),
88+
WindowMatchers.isValueInGlobalWindow("foo", BoundedWindow.TIMESTAMP_MIN_VALUE));
89+
90+
assertThat(
91+
WindowedValues.timestampedValueInGlobalWindow("foo", BoundedWindow.TIMESTAMP_MIN_VALUE),
92+
WindowMatchers.isValueInGlobalWindow("foo"));
93+
}
94+
95+
@Test
96+
public void test_IsValueInGlobalWindow_ValueInGlobalWindow() {
97+
assertThat(
98+
WindowedValues.valueInGlobalWindow("foo"), WindowMatchers.isValueInGlobalWindow("foo"));
99+
100+
assertThat(
101+
WindowedValues.valueInGlobalWindow("foo"),
102+
WindowMatchers.isValueInGlobalWindow("foo", BoundedWindow.TIMESTAMP_MIN_VALUE));
103+
}
78104
}

runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -246,8 +246,8 @@ private BundleWindowedValueReceiver(UncommittedBundle<KV<K, Iterable<V>>> bundle
246246
}
247247

248248
@Override
249-
public void output(WindowedValue<KV<K, Iterable<V>>> valueWithMetadata) {
250-
bundle.add(valueWithMetadata);
249+
public void output(WindowedValue<KV<K, Iterable<V>>> windowedValue) {
250+
bundle.add(windowedValue);
251251
}
252252
}
253253
}

runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,7 @@ public WindowIntoEvaluator(
9090
public void processElement(WindowedValue<InputT> compressedElement) throws Exception {
9191
for (WindowedValue<InputT> element : compressedElement.explodeWindows()) {
9292
Collection<? extends BoundedWindow> windows = assignWindows(windowFn, element);
93-
outputBundle.add(
94-
WindowedValues.of(
95-
element.getValue(), element.getTimestamp(), windows, element.getPaneInfo()));
93+
WindowedValues.builder(element).setWindows(windows).setReceiver(outputBundle::add).output();
9694
}
9795
}
9896

0 commit comments

Comments
 (0)