Skip to content

Conversation

@kennknowles
Copy link
Member

@kennknowles kennknowles commented May 9, 2025

With the addition of extended metadata for elements, we need to make code - especially user code - more robust to added metadata fields. This pull request adds OutputBuilder which should be used to builds new values, and builders should always be obtained by pulling metadata from some context (for example a currently-in-process element) when possible.

Concretely, this pull request

  • Adds OutputBuilder as a public interface extending WindowedValue.
  • Adds WindowedValues.builder(WindowedValueReceiver) to produce an OutputBuilder that will output to the provider receiver.

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@kennknowles kennknowles changed the title DO NOT MERGE: demonstrating impact of OutputBuilder DO NOT MERGE: demonstrating OutputBuilder in Java SDK May 9, 2025
@kennknowles kennknowles changed the title DO NOT MERGE: demonstrating OutputBuilder in Java SDK Feedback requested: introduce OutputBuilder in Java SDK Jun 2, 2025
@kennknowles kennknowles force-pushed the OutputBuilder branch 3 times, most recently from 2e4c6a7 to c872d38 Compare September 12, 2025 15:49
@kennknowles kennknowles marked this pull request as ready for review September 12, 2025 15:50
@kennknowles kennknowles force-pushed the OutputBuilder branch 2 times, most recently from 6f15615 to 33d702f Compare September 12, 2025 17:09
@kennknowles
Copy link
Member Author

R: @stankiewicz

@github-actions
Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment assign set of reviewers

@kennknowles
Copy link
Member Author

I think Radek you have the most context, and can critique the details where I have missed something. Especially in FnApiDoFnRunner I see a couple spots where I think I must have gotten impatient - wherever I decompose a WindowedValue and then it gets put back together, we will lose metadata (once it is introduced).

I realize I also need some end-user tests of the OutputBuilder. It is very very thoroughly tested by how it is used internal to the SDK but there are no examples of use in a DoFn in an actual pipeline.

@kennknowles
Copy link
Member Author

I also need to check that this does not create noticeable allocation/GC overhead by the new style.

@kennknowles kennknowles force-pushed the OutputBuilder branch 2 times, most recently from eecb99d to 58e26b4 Compare September 17, 2025 20:49
PaneInfo paneInfo) {
checkTimestamp(timestamp);
outputTo(mainOutputConsumer, WindowedValues.of(output, timestamp, windows, paneInfo));
builder(output).setTimestamp(timestamp).setWindows(windows).setPaneInfo(paneInfo).output();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just like it so much!


/** Create a Builder that takes element metadata from the provideed delegate. */
public static <T> Builder<T> builder(WindowedValue<T> template) {
return new Builder<T>()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thi is great, so if we start to expand with new fields, this is the place to propagate them.

Copy link
Contributor

@stankiewicz stankiewicz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

read all of it, this will massively help extend WindowedValue with new fields while avoiding the journey of extending each output method. thanks!

@kennknowles
Copy link
Member Author

Thank you!

OK, now it looks like somehow my attempts to make the distroless integration tests use the HEAD container caused actually maybe other tests to also not use the HEAD container. I know that internal tests are all green but I will see which of these tests are actually broken versus not configured to use the right container.

@kennknowles
Copy link
Member Author

OK! All green and I am going to merge.

@kennknowles kennknowles merged commit 0384a59 into apache:master Sep 23, 2025
52 of 58 checks passed
@kennknowles kennknowles deleted the OutputBuilder branch September 23, 2025 14:29
.withValue(value)
.setReceiver(
windowedValue -> {
checkTimestamp(windowedValue.getTimestamp());
Copy link
Contributor

@kellen kellen Nov 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @kennknowles Can you explain the reasoning about checking the timestamp here? In Scio we have some DoFns that propagate the element timestamp and this additional check breaks their current behavior. Not sure if we've been doing the wrong thing or not

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My intention was only to maintain functionality. I am sure that I debugged a failing test around "invalid timestamps are correctly rejected" but I cannot recall exactly which one it is. I'm re-running some suites to find it and I'll report back here. I definitely don't want any currently-correct code to start failing!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! For context, one of the failing tests is for our AsyncDoFn, where we process elements, spinning off a set of futures while waiting for their respective responses, awaiting on those responses, and outputting those elements back to their original windows with their original timestamps.

The corresponding failing test is here, giving the following exception:

[info] - should propagate element metadata *** FAILED *** (45 milliseconds)
[info]   org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalArgumentException: Cannot output with timestamp 1970-01-01T00:00:00.001Z. Output timestamps must be no earlier than the timestamp of the current input or timer (1970-01-01T00:00:00.008Z) minus the allowed skew (0 milliseconds) and no later than 294247-01-10T04:00:54.775Z. See the DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.
[info]   at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:377)
[info]   at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:345)
[info]   at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
[info]   at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
[info]   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:325)
[info]   at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:442)
[info]   at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:381)
[info]   at com.spotify.scio.ScioContext.execute(ScioContext.scala:671)
[info]   at com.spotify.scio.ScioContext$$anonfun$run$1.apply(ScioContext.scala:658)
[info]   at com.spotify.scio.ScioContext$$anonfun$run$1.apply(ScioContext.scala:646)
[info]   ...
[info]   Cause: java.lang.IllegalArgumentException: Cannot output with timestamp 1970-01-01T00:00:00.001Z. Output timestamps must be no earlier than the timestamp of the current input or timer (1970-01-01T00:00:00.008Z) minus the allowed skew (0 milliseconds) and no later than 294247-01-10T04:00:54.775Z. See the DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.
[info]   at org.apache.beam.runners.core.SimpleDoFnRunner.checkTimestamp(SimpleDoFnRunner.java:263)
[info]   at org.apache.beam.runners.core.SimpleDoFnRunner.access$1300(SimpleDoFnRunner.java:89)
[info]   at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.lambda$outputWindowedValue$0(SimpleDoFnRunner.java:462)
[info]   at org.apache.beam.sdk.values.WindowedValues$Builder.output(WindowedValues.java:210)
[info]   at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWindowedValue(SimpleDoFnRunner.java:465)
[info]   at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:123)
[info]   at org.apache.beam.sdk.values.WindowedValues$Builder.output(WindowedValues.java:210)
[info]   at org.apache.beam.sdk.transforms.DoFn$OutputReceiver.outputWindowedValue(DoFn.java:416)
[info]   at com.spotify.scio.transforms.BaseAsyncLookupDoFn.lambda$processElement$0(BaseAsyncLookupDoFn.java:190)
[info]   at com.spotify.scio.transforms.BaseAsyncLookupDoFn.flush(BaseAsyncLookupDoFn.java:310)
[info]   ...

Copy link
Member Author

@kennknowles kennknowles Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. This is actually a red herring. The error is not coming from this checkTimestamp. This is the portable / Dataflow v2 path.

The SimpleDoFnRunner is the v1 codepath and I see it now:

I've updated #36822 to remove that check. On the other hand, it looks to be correct. I wonder if the problem is that elem is not captured by the lambda and is being accessed as a field later when the local element has changed when the AsyncDoFn is outputting and it is validating the timestamp against a later element. SimpleDoFnRunner is not designed for concurrent or async use, so that could be the problem. A simple fix that would preserve safety would be to capture it in a local variable that ends up in the closure of the lambda.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/apache/beam/pull/36838/files might fix it - if you have a test environment where it is easy to pull this in and try it (or test against a snapshot once the nightlies run)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants