Skip to content

Commit efe4648

Browse files
committed
WIP: add OutputBuilder
1 parent 60307b4 commit efe4648

File tree

85 files changed

+1041
-643
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

85 files changed

+1041
-643
lines changed

buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1198,6 +1198,7 @@ class BeamModulePlugin implements Plugin<Project> {
11981198

11991199
List<String> skipDefRegexes = []
12001200
skipDefRegexes << "AutoValue_.*"
1201+
skipDefRegexes << "AutoBuilder_.*"
12011202
skipDefRegexes << "AutoOneOf_.*"
12021203
skipDefRegexes << ".*\\.jmh_generated\\..*"
12031204
skipDefRegexes += configuration.generatedClassPatterns
@@ -1291,7 +1292,8 @@ class BeamModulePlugin implements Plugin<Project> {
12911292
'**/org/apache/beam/gradle/**',
12921293
'**/org/apache/beam/model/**',
12931294
'**/org/apache/beam/runners/dataflow/worker/windmill/**',
1294-
'**/AutoValue_*'
1295+
'**/AutoValue_*',
1296+
'**/AutoBuilder_*',
12951297
]
12961298

12971299
def jacocoEnabled = project.hasProperty('enableJacocoReport')

runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
2626
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
2727
import org.apache.beam.sdk.util.SystemDoFnInternal;
28+
import org.apache.beam.sdk.util.ValueWithMetadataReceiver;
2829
import org.apache.beam.sdk.util.WindowedValue;
2930
import org.apache.beam.sdk.util.construction.TriggerTranslation;
3031
import org.apache.beam.sdk.values.KV;
@@ -91,8 +92,8 @@ public GroupAlsoByWindowViaWindowSetNewDoFn(
9192
this.triggerProto = TriggerTranslation.toProto(windowingStrategy.getTrigger());
9293
}
9394

94-
private OutputWindowedValue<KV<K, OutputT>> outputWindowedValue() {
95-
return new OutputWindowedValue<KV<K, OutputT>>() {
95+
private ValueWithMetadataReceiver<KV<K, OutputT>> outputWindowedValue() {
96+
return new ValueWithMetadataReceiver<KV<K, OutputT>>() {
9697
@Override
9798
public void outputWindowedValue(
9899
KV<K, OutputT> output,

runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ public <K, InputT> Iterable<WindowedValue<InputT>> filter(
141141
} else {
142142
nonLateElements.add(
143143
WindowedValue.of(
144-
element.getValue(), element.getTimestamp(), window, element.getPane()));
144+
element.getValue(), element.getTimestamp(), window, element.getPaneInfo()));
145145
}
146146
}
147147
}

runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
4646
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
4747
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
48+
import org.apache.beam.sdk.util.ValueWithMetadataReceiver;
4849
import org.apache.beam.sdk.util.WindowedValue;
4950
import org.apache.beam.sdk.values.KV;
5051
import org.apache.beam.sdk.values.PCollectionView;
@@ -72,7 +73,7 @@ public class OutputAndTimeBoundedSplittableProcessElementInvoker<
7273
InputT, OutputT, RestrictionT, PositionT, WatermarkEstimatorStateT> {
7374
private final DoFn<InputT, OutputT> fn;
7475
private final PipelineOptions pipelineOptions;
75-
private final OutputWindowedValue<OutputT> output;
76+
private final ValueWithMetadataReceiver<OutputT> output;
7677
private final SideInputReader sideInputReader;
7778
private final ScheduledExecutorService executor;
7879
private final int maxNumOutputs;
@@ -98,7 +99,7 @@ public class OutputAndTimeBoundedSplittableProcessElementInvoker<
9899
public OutputAndTimeBoundedSplittableProcessElementInvoker(
99100
DoFn<InputT, OutputT> fn,
100101
PipelineOptions pipelineOptions,
101-
OutputWindowedValue<OutputT> output,
102+
ValueWithMetadataReceiver<OutputT> output,
102103
SideInputReader sideInputReader,
103104
ScheduledExecutorService executor,
104105
int maxNumOutputs,
@@ -375,7 +376,7 @@ public Instant timestamp() {
375376

376377
@Override
377378
public PaneInfo pane() {
378-
return element.getPane();
379+
return element.getPaneInfo();
379380
}
380381

381382
@Override
@@ -390,7 +391,7 @@ public void output(OutputT output) {
390391

391392
@Override
392393
public void outputWithTimestamp(OutputT value, Instant timestamp) {
393-
outputWindowedValue(value, timestamp, element.getWindows(), element.getPane());
394+
outputWindowedValue(value, timestamp, element.getWindows(), element.getPaneInfo());
394395
}
395396

396397
@Override
@@ -403,7 +404,7 @@ public void outputWindowedValue(
403404
if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) {
404405
((TimestampObservingWatermarkEstimator) watermarkEstimator).observeTimestamp(timestamp);
405406
}
406-
output.outputWindowedValue(value, timestamp, windows, paneInfo);
407+
output.output(WindowedValue.of(value, timestamp, windows, paneInfo));
407408
}
408409

409410
@Override
@@ -413,7 +414,7 @@ public <T> void output(TupleTag<T> tag, T value) {
413414

414415
@Override
415416
public <T> void outputWithTimestamp(TupleTag<T> tag, T value, Instant timestamp) {
416-
outputWindowedValue(tag, value, timestamp, element.getWindows(), element.getPane());
417+
outputWindowedValue(tag, value, timestamp, element.getWindows(), element.getPaneInfo());
417418
}
418419

419420
@Override

runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.apache.beam.sdk.transforms.windowing.Window;
4747
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
4848
import org.apache.beam.sdk.transforms.windowing.WindowFn;
49+
import org.apache.beam.sdk.util.ValueWithMetadataReceiver;
4950
import org.apache.beam.sdk.util.WindowTracing;
5051
import org.apache.beam.sdk.util.WindowedValue;
5152
import org.apache.beam.sdk.values.KV;
@@ -106,7 +107,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
106107
*/
107108
private final WindowingStrategy<Object, W> windowingStrategy;
108109

109-
private final OutputWindowedValue<KV<K, OutputT>> outputter;
110+
private final ValueWithMetadataReceiver<KV<K, OutputT>> outputter;
110111

111112
private final StateInternals stateInternals;
112113

@@ -214,7 +215,7 @@ public ReduceFnRunner(
214215
ExecutableTriggerStateMachine triggerStateMachine,
215216
StateInternals stateInternals,
216217
TimerInternals timerInternals,
217-
OutputWindowedValue<KV<K, OutputT>> outputter,
218+
ValueWithMetadataReceiver<KV<K, OutputT>> outputter,
218219
@Nullable SideInputReader sideInputReader,
219220
ReduceFn<K, InputT, OutputT, W> reduceFn,
220221
@Nullable PipelineOptions options) {

runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,7 @@ public <T> T sideInput(PCollectionView<T> view) {
404404

405405
@Override
406406
public PaneInfo pane() {
407-
return elem.getPane();
407+
return elem.getPaneInfo();
408408
}
409409

410410
@Override
@@ -436,7 +436,7 @@ public <T> void output(TupleTag<T> tag, T output) {
436436
public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
437437
checkNotNull(tag, "Tag passed to outputWithTimestamp cannot be null");
438438
checkTimestamp(elem.getTimestamp(), timestamp);
439-
outputWindowedValue(tag, output, timestamp, elem.getWindows(), elem.getPane());
439+
outputWindowedValue(tag, output, timestamp, elem.getWindows(), elem.getPaneInfo());
440440
}
441441

442442
@Override
@@ -446,8 +446,16 @@ public <T> void outputWindowedValue(
446446
Instant timestamp,
447447
Collection<? extends BoundedWindow> windows,
448448
PaneInfo paneInfo) {
449-
SimpleDoFnRunner.this.outputWindowedValue(
450-
tag, WindowedValue.of(output, timestamp, windows, paneInfo));
449+
WindowedValue.Builder<T> builder =
450+
((WindowedValue.Builder<T>) elem.toBuilder())
451+
.setValue(output)
452+
.setTimestamp(timestamp)
453+
.setWindows(windows)
454+
.setPaneInfo(paneInfo);
455+
456+
SimpleDoFnRunner.this.outputWindowedValue(tag, builder.build());
457+
458+
SimpleDoFnRunner.this.outputWindowedValue(tag, builder.builder());
451459
}
452460

453461
@Override

runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,7 @@ public PipelineOptions pipelineOptions() {
430430

431431
@Override
432432
public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
433-
return elementAndRestriction.getKey().getPane();
433+
return elementAndRestriction.getKey().getPaneInfo();
434434
}
435435

436436
@Override
@@ -490,7 +490,7 @@ public PipelineOptions pipelineOptions() {
490490

491491
@Override
492492
public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
493-
return elementAndRestriction.getKey().getPane();
493+
return elementAndRestriction.getKey().getPaneInfo();
494494
}
495495

496496
@Override
@@ -544,7 +544,7 @@ public PipelineOptions pipelineOptions() {
544544

545545
@Override
546546
public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
547-
return elementAndRestriction.getKey().getPane();
547+
return elementAndRestriction.getKey().getPaneInfo();
548548
}
549549

550550
@Override

runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
4242
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
4343
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
44+
import org.apache.beam.sdk.util.ValueWithMetadataReceiver;
4445
import org.apache.beam.sdk.util.WindowedValue;
4546
import org.apache.beam.sdk.values.TupleTag;
4647
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles;
@@ -112,7 +113,7 @@ private SplittableProcessElementInvoker<Void, String, OffsetRange, Long, Void>.R
112113
new OutputAndTimeBoundedSplittableProcessElementInvoker<>(
113114
fn,
114115
PipelineOptionsFactory.create(),
115-
new OutputWindowedValue<String>() {
116+
new ValueWithMetadataReceiver<String>() {
116117
@Override
117118
public void outputWindowedValue(
118119
String output,

runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -759,7 +759,8 @@ public void testWatermarkHoldAndLateData() throws Exception {
759759
equalTo(new Instant(1)),
760760
equalTo((BoundedWindow) expectedWindow))));
761761
assertThat(
762-
output.get(0).getPane(), equalTo(PaneInfo.createPane(true, false, Timing.EARLY, 0, -1)));
762+
output.get(0).getPaneInfo(),
763+
equalTo(PaneInfo.createPane(true, false, Timing.EARLY, 0, -1)));
763764

764765
// There is no end-of-window hold, but the timer set by the trigger holds the watermark
765766
assertThat(tester.getWatermarkHold(), nullValue());
@@ -805,7 +806,8 @@ public void testWatermarkHoldAndLateData() throws Exception {
805806
0, // window start
806807
10))); // window end
807808
assertThat(
808-
output.get(0).getPane(), equalTo(PaneInfo.createPane(false, false, Timing.EARLY, 1, -1)));
809+
output.get(0).getPaneInfo(),
810+
equalTo(PaneInfo.createPane(false, false, Timing.EARLY, 1, -1)));
809811

810812
// Since the element hold is cleared, there is no hold remaining
811813
assertThat(tester.getWatermarkHold(), nullValue());
@@ -846,7 +848,8 @@ public void testWatermarkHoldAndLateData() throws Exception {
846848
0, // window start
847849
10))); // window end
848850
assertThat(
849-
output.get(0).getPane(), equalTo(PaneInfo.createPane(false, false, Timing.ON_TIME, 2, 0)));
851+
output.get(0).getPaneInfo(),
852+
equalTo(PaneInfo.createPane(false, false, Timing.ON_TIME, 2, 0)));
850853

851854
tester.setAutoAdvanceOutputWatermark(true);
852855

@@ -879,7 +882,7 @@ public void testWatermarkHoldAndLateData() throws Exception {
879882
0, // window start
880883
10))); // window end
881884
assertThat(
882-
output.get(0).getPane(), equalTo(PaneInfo.createPane(false, true, Timing.LATE, 3, 1)));
885+
output.get(0).getPaneInfo(), equalTo(PaneInfo.createPane(false, true, Timing.LATE, 3, 1)));
883886
assertEquals(new Instant(50), tester.getOutputWatermark());
884887
assertEquals(null, tester.getWatermarkHold());
885888

@@ -1486,7 +1489,8 @@ public void testMergeBeforeFinalizing() throws Exception {
14861489
1, // window start
14871490
20)); // window end
14881491
assertThat(
1489-
output.get(0).getPane(), equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)));
1492+
output.get(0).getPaneInfo(),
1493+
equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)));
14901494
}
14911495

14921496
/**
@@ -1527,7 +1531,8 @@ public void testMergingWithCloseBeforeGC() throws Exception {
15271531
1, // window start
15281532
20)); // window end
15291533
assertThat(
1530-
output.get(0).getPane(), equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)));
1534+
output.get(0).getPaneInfo(),
1535+
equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)));
15311536
}
15321537

15331538
/** Ensure a closed trigger has its state recorded in the merge result window. */
@@ -1614,7 +1619,8 @@ public void testMergingWithReusedWindow() throws Exception {
16141619
equalTo((BoundedWindow) mergedWindow)));
16151620

16161621
assertThat(
1617-
output.get(0).getPane(), equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)));
1622+
output.get(0).getPaneInfo(),
1623+
equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)));
16181624
}
16191625

16201626
/**
@@ -1661,7 +1667,7 @@ public void testMergingWithClosedRepresentative() throws Exception {
16611667
1, // window start
16621668
18)); // window end
16631669
assertThat(
1664-
output.get(0).getPane(), equalTo(PaneInfo.createPane(true, true, Timing.EARLY, 0, 0)));
1670+
output.get(0).getPaneInfo(), equalTo(PaneInfo.createPane(true, true, Timing.EARLY, 0, 0)));
16651671
}
16661672

16671673
/**
@@ -1702,7 +1708,7 @@ public void testMergingWithClosedDoesNotPoison() throws Exception {
17021708
2, // window start
17031709
12)); // window end
17041710
assertThat(
1705-
output.get(0).getPane(), equalTo(PaneInfo.createPane(true, true, Timing.EARLY, 0, 0)));
1711+
output.get(0).getPaneInfo(), equalTo(PaneInfo.createPane(true, true, Timing.EARLY, 0, 0)));
17061712
assertThat(
17071713
output.get(1),
17081714
isSingleWindowedValue(
@@ -1711,7 +1717,8 @@ public void testMergingWithClosedDoesNotPoison() throws Exception {
17111717
1, // window start
17121718
13)); // window end
17131719
assertThat(
1714-
output.get(1).getPane(), equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)));
1720+
output.get(1).getPaneInfo(),
1721+
equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)));
17151722
}
17161723

17171724
/**
@@ -1811,7 +1818,7 @@ public void testIdempotentEmptyPanesDiscarding() throws Exception {
18111818
// The late pane has the correct indices.
18121819
assertThat(output.get(1).getValue(), contains(3));
18131820
assertThat(
1814-
output.get(1).getPane(), equalTo(PaneInfo.createPane(false, true, Timing.LATE, 1, 1)));
1821+
output.get(1).getPaneInfo(), equalTo(PaneInfo.createPane(false, true, Timing.LATE, 1, 1)));
18151822

18161823
assertTrue(tester.isMarkedFinished(firstWindow));
18171824
tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow);
@@ -1850,7 +1857,8 @@ public void testIdempotentEmptyPanesAccumulating() throws Exception {
18501857
assertThat(output.size(), equalTo(1));
18511858
assertThat(output.get(0), isSingleWindowedValue(containsInAnyOrder(1, 2), 1, 0, 10));
18521859
assertThat(
1853-
output.get(0).getPane(), equalTo(PaneInfo.createPane(true, false, Timing.ON_TIME, 0, 0)));
1860+
output.get(0).getPaneInfo(),
1861+
equalTo(PaneInfo.createPane(true, false, Timing.ON_TIME, 0, 0)));
18541862

18551863
// Fire another timer with no data; the empty pane should not be output even though the
18561864
// trigger is ready to fire
@@ -1868,7 +1876,7 @@ public void testIdempotentEmptyPanesAccumulating() throws Exception {
18681876
// The late pane has the correct indices.
18691877
assertThat(output.get(0).getValue(), containsInAnyOrder(1, 2, 3));
18701878
assertThat(
1871-
output.get(0).getPane(), equalTo(PaneInfo.createPane(false, true, Timing.LATE, 1, 1)));
1879+
output.get(0).getPaneInfo(), equalTo(PaneInfo.createPane(false, true, Timing.LATE, 1, 1)));
18721880

18731881
assertTrue(tester.isMarkedFinished(firstWindow));
18741882
tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow);
@@ -2193,17 +2201,17 @@ public void fireNonEmptyOnDrainInGlobalWindow() throws Exception {
21932201
List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
21942202
assertEquals(n / 3, output.size());
21952203
for (int i = 0; i < output.size(); i++) {
2196-
assertEquals(Timing.EARLY, output.get(i).getPane().getTiming());
2197-
assertEquals(i, output.get(i).getPane().getIndex());
2204+
assertEquals(Timing.EARLY, output.get(i).getPaneInfo().getTiming());
2205+
assertEquals(i, output.get(i).getPaneInfo().getIndex());
21982206
assertEquals(3, Iterables.size(output.get(i).getValue()));
21992207
}
22002208

22012209
tester.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
22022210

22032211
output = tester.extractOutput();
22042212
assertEquals(1, output.size());
2205-
assertEquals(Timing.ON_TIME, output.get(0).getPane().getTiming());
2206-
assertEquals(n / 3, output.get(0).getPane().getIndex());
2213+
assertEquals(Timing.ON_TIME, output.get(0).getPaneInfo().getTiming());
2214+
assertEquals(n / 3, output.get(0).getPaneInfo().getIndex());
22072215
assertEquals(n - ((n / 3) * 3), Iterables.size(output.get(0).getValue()));
22082216
}
22092217

@@ -2231,17 +2239,17 @@ public void fireEmptyOnDrainInGlobalWindowIfRequested() throws Exception {
22312239
List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
22322240
assertEquals((n + 3) / 4, output.size());
22332241
for (int i = 0; i < output.size(); i++) {
2234-
assertEquals(Timing.EARLY, output.get(i).getPane().getTiming());
2235-
assertEquals(i, output.get(i).getPane().getIndex());
2242+
assertEquals(Timing.EARLY, output.get(i).getPaneInfo().getTiming());
2243+
assertEquals(i, output.get(i).getPaneInfo().getIndex());
22362244
assertEquals(4, Iterables.size(output.get(i).getValue()));
22372245
}
22382246

22392247
tester.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
22402248

22412249
output = tester.extractOutput();
22422250
assertEquals(1, output.size());
2243-
assertEquals(Timing.ON_TIME, output.get(0).getPane().getTiming());
2244-
assertEquals((n + 3) / 4, output.get(0).getPane().getIndex());
2251+
assertEquals(Timing.ON_TIME, output.get(0).getPaneInfo().getTiming());
2252+
assertEquals((n + 3) / 4, output.get(0).getPaneInfo().getIndex());
22452253
assertEquals(0, Iterables.size(output.get(0).getValue()));
22462254
}
22472255

0 commit comments

Comments
 (0)