Skip to content

Commit 1bfc732

Browse files
authored
Merge branch 'master' into enable_set_and_map_tests
2 parents 4d1241e + 5732086 commit 1bfc732

File tree

37 files changed

+1103
-1140
lines changed

37 files changed

+1103
-1140
lines changed

.github/actions/dind-up-action/action.yml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ inputs:
4343
storage-driver:
4444
default: overlay2
4545
additional-dockerd-args:
46-
default: ""
46+
default: "--tls=false"
4747
use-host-network:
4848
description: "Run DinD with --network host instead of publishing a TCP port."
4949
default: "false"
@@ -206,20 +206,20 @@ runs:
206206
run: |
207207
set -euo pipefail
208208
NAME="${{ inputs.container-name || 'dind-daemon' }}"
209-
209+
210210
# Use host daemon to inspect the DinD container
211211
nm=$(docker inspect -f '{{.HostConfig.NetworkMode}}' "$NAME")
212212
echo "DinD NetworkMode=${nm}"
213213
214214
# Try to find the bridge network IP
215215
ip=$(docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' "$NAME" || true)
216-
216+
217217
# If still empty, likely host networking -> use loopback
218218
if [[ -z "${ip}" || "${nm}" == "host" ]]; then
219219
echo "No bridge IP found or using host network. Falling back to 127.0.0.1."
220220
ip="127.0.0.1"
221221
fi
222-
222+
223223
echo "Discovered DinD IP: ${ip}"
224224
echo "dind-ip=${ip}" >> "$GITHUB_OUTPUT"
225225
@@ -237,7 +237,7 @@ runs:
237237
hostport=$(docker port redis-smoke 6379/tcp | sed 's/.*://')
238238
echo "Redis container started, mapped to host port ${hostport}"
239239
echo "Probing connection to ${DIND_IP}:${hostport} ..."
240-
240+
241241
timeout 5 bash -c 'exec 3<>/dev/tcp/$DIND_IP/'"$hostport"
242242
if [[ $? -eq 0 ]]; then
243243
echo "TCP connection successful. Port mapping is working."
@@ -272,4 +272,4 @@ runs:
272272
shell: bash
273273
run: |
274274
echo "DOCKER_HOST=${{ steps.set-output.outputs.docker-host }}" >> "$GITHUB_ENV"
275-
echo "DIND_IP=${{ steps.discover-ip.outputs.dind-ip }}" >> "$GITHUB_ENV"
275+
echo "DIND_IP=${{ steps.discover-ip.outputs.dind-ip }}" >> "$GITHUB_ENV"
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run.",
3-
"modification": 3
3+
"modification": 4
44
}

.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,6 @@
55
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test",
66
"https://github.com/apache/beam/pull/31268": "noting that PR #31268 should run this test",
77
"https://github.com/apache/beam/pull/31490": "noting that PR #31490 should run this test",
8-
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"
8+
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface",
9+
"https://github.com/apache/beam/pull/36631": "dofn lifecycle",
910
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
22
"https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder",
33
"comment": "Modify this file in a trivial way to cause this test suite to run",
4-
"modification": 1,
4+
"modification": 2,
55
}

.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming.json

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
11
{
2-
"https://github.com/apache/beam/pull/35213": "Eliminating getPane() in favor of getPaneInfo()",
3-
"https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners",
42
"comment": "Modify this file in a trivial way to cause this test suite to run",
53
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test",
64
"https://github.com/apache/beam/pull/31798": "noting that PR #31798 should run this test",
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
<!--
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing,
13+
software distributed under the License is distributed on an
14+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
KIND, either express or implied. See the License for the
16+
specific language governing permissions and limitations
17+
under the License.
18+
-->
19+
20+
# Adding/Removing Python Versions in Apache Beam
21+
22+
Python releases are now on an annual cadence, with new versions being released (and an old version reaching end-of-life) in October of a given year. This means that at any given time, Beam could be supporting up to five different versions of Python. Removing EOL versions is a higher priority than adding new versions, as EOL Python versions may not get vulnerability fixes when dependencies fix them.
23+
24+
## Adding a Python Version
25+
26+
1. Upgrade Beam direct dependencies to versions that support the new Python versions. Complex libraries, like pyarrow or numpy need to provide wheels for the new Python version. Infrastructure libraries, such as Beam build dependencies, cibuildwheel, and other libraries with a hardcoded version, may have to be upgraded as well.
27+
* Some dependency versions may not support both the minimum and maximum Python version for Beam and will require version-specific dependencies.
28+
29+
1. Add a Beam Python container for the new Python version.
30+
* https://github.com/apache/beam/tree/master/sdks/python/container
31+
32+
1. Add a new Python version to different test suites:
33+
* [Tox test suites](https://github.com/apache/beam/blob/master/sdks/python/tox.ini)
34+
* Gradle tasks such as pre-commits, post-commits etc.
35+
* Runner-specific versioning checks
36+
* Fix any tests that fail on the new Python version.
37+
* Typically, a new Python version requires updating Beam Type Inference code. See https://github.com/apache/beam/issues/31047
38+
39+
1. Add the GitHub actions workflows for the new Python version.
40+
* Example: https://github.com/apache/beam/blob/master/.github/workflows/python_tests.yml
41+
* The minimum and maximum Python versions are defined in a number of workflows and the [test-properties.json](https://github.com/apache/beam/blob/ce1b1dcbc596d1e7c914ee0f7b0d48f2d2bf87e1/.github/actions/setup-default-test-properties/test-properties.json) file, there will be potentially hundreds of changes for this step.
42+
43+
1. Add support for building wheels for the new Python version.
44+
* https://github.com/apache/beam/blob/master/.github/workflows/build_wheels.yml
45+
46+
1. Update the upper limit in [__init__.py](https://github.com/apache/beam/blob/0ef5d3a185c1420da118208353ceb0b40b3a27c9/sdks/python/apache_beam/__init__.py#L78) with the next major Python version.
47+
48+
1. Add the new Python version in release validation scripts: https://github.com/apache/beam/pull/31415
49+
50+
* If there is a new feature update or there is a regression when adding a new Python version, please file an [issue](https://github.com/apache/beam/issues).
51+
* **All the unit tests and Integration tests must pass before merging the new version.**
52+
* If you are a non-committer, please ask the committers to run a seed job on your PR to test all the new changes.
53+
54+
For an example, see PRs associated with https://github.com/apache/beam/issues/29149, and commits on https://github.com/apache/beam/pull/30828 which add Python 3.12 support.
55+
56+
## Removing a Python Version
57+
58+
1. Bump the Python version in [setup.py](https://github.com/apache/beam/blob/0ef5d3a185c1420da118208353ceb0b40b3a27c9/sdks/python/setup.py#L152) and update the Python version warning in [__init__.py](https://github.com/apache/beam/blob/0ef5d3a185c1420da118208353ceb0b40b3a27c9/sdks/python/apache_beam/__init__.py#L78).
59+
60+
1. Remove test suites for the unsupported Python version:
61+
* Migrate GitHub actions workflows from the deprecated Python version to the next one
62+
* Example PR: https://github.com/apache/beam/pull/32429
63+
* Make these changes on a branch in the main Beam repository if possible so you can execute the new workflows directly for testing.
64+
* Some workflows only run on the minimum supported Python version (like the linting and coverage precommits.) These may utilize libraries that need updates to run on the next Python version.
65+
* Remove the unsupported Python version from the following files/directories:
66+
* sdks/python/test-suites/gradle.properties
67+
* apache_beam/testing/tox
68+
Move any workflows that exist only for the minimum Python version from tox/py3X to the next minimum Python version's folder
69+
* apache_beam/testing/dataflow
70+
* apache_beam/testing/direct
71+
* apache_beam/testing/portable
72+
* Remove the unsupported Python version gradle tasks from
73+
* build.gradle.kts
74+
* settings.gradle.kts
75+
* buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
76+
* Remove the support for building wheels and source distributions for the unsupported Python version from [.github/workflows/build_wheels.yml](https://github.com/apache/beam/blob/ce1b1dcbc596d1e7c914ee0f7b0d48f2d2bf87e1/.github/workflows/build_wheels.yml)
77+
* Remove the unsupported Python version from [sdks/python/tox.ini](https://github.com/apache/beam/blob/master/sdks/python/tox.ini)
78+
79+
1. Delete the unsupported Python version containers from [sdks/python/container](https://github.com/apache/beam/tree/master/sdks/python/container)
80+
81+
1. Clean up any code that applies to the removed Python version.
82+
* This will usually be version-specific dependencies in setup.py or branches in the typehinting module.

runners/google-cloud-dataflow-java/build.gradle

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,6 @@ def commonLegacyExcludeCategories = [
204204
'org.apache.beam.sdk.testing.UsesDistributionMetrics',
205205
'org.apache.beam.sdk.testing.UsesGaugeMetrics',
206206
'org.apache.beam.sdk.testing.UsesTestStream',
207-
'org.apache.beam.sdk.testing.UsesParDoLifecycle', // doesn't support remote runner
208207
'org.apache.beam.sdk.testing.UsesMetricsPusher',
209208
'org.apache.beam.sdk.testing.UsesBundleFinalizer',
210209
'org.apache.beam.sdk.testing.UsesBoundedTrieMetrics', // Dataflow QM as of now does not support returning back BoundedTrie in metric result.
@@ -450,7 +449,17 @@ task validatesRunner {
450449
excludedTests: [
451450
// TODO(https://github.com/apache/beam/issues/21472)
452451
'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState',
453-
]
452+
453+
// These tests use static state and don't work with remote execution.
454+
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundle',
455+
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundleStateful',
456+
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement',
457+
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful',
458+
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetup',
459+
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetupStateful',
460+
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundle',
461+
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful',
462+
]
454463
))
455464
}
456465

@@ -470,7 +479,17 @@ task validatesRunnerStreaming {
470479
// GroupIntoBatches.withShardedKey not supported on streaming runner v1
471480
// https://github.com/apache/beam/issues/22592
472481
'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testWithShardedKeyInGlobalWindow',
473-
]
482+
483+
// These tests use static state and don't work with remote execution.
484+
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundle',
485+
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundleStateful',
486+
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement',
487+
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful',
488+
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetup',
489+
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetupStateful',
490+
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundle',
491+
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful',
492+
]
474493
))
475494
}
476495

@@ -539,8 +558,7 @@ task validatesRunnerV2 {
539558
excludedTests: [
540559
'org.apache.beam.sdk.transforms.ReshuffleTest.testReshuffleWithTimestampsStreaming',
541560

542-
// TODO(https://github.com/apache/beam/issues/18592): respect ParDo lifecycle.
543-
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testFnCallSequenceStateful',
561+
// These tests use static state and don't work with remote execution.
544562
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundle',
545563
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundleStateful',
546564
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement',
@@ -582,7 +600,7 @@ task validatesRunnerV2Streaming {
582600
'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState',
583601
'org.apache.beam.sdk.transforms.GroupByKeyTest.testCombiningAccumulatingProcessingTime',
584602

585-
// TODO(https://github.com/apache/beam/issues/18592): respect ParDo lifecycle.
603+
// These tests use static state and don't work with remote execution.
586604
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundle',
587605
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundleStateful',
588606
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement',

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactory.java

Lines changed: 39 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -105,11 +105,32 @@ public DataflowMapTaskExecutor create(
105105
Networks.replaceDirectedNetworkNodes(
106106
network, createOutputReceiversTransform(stageName, counterSet));
107107

108-
// Swap out all the ParallelInstruction nodes with Operation nodes
109-
Networks.replaceDirectedNetworkNodes(
110-
network,
111-
createOperationTransformForParallelInstructionNodes(
112-
stageName, network, options, readerFactory, sinkFactory, executionContext));
108+
// Swap out all the ParallelInstruction nodes with Operation nodes. While updating the network,
109+
// we keep track of
110+
// the created Operations so that if an exception is encountered we can properly abort started
111+
// operations.
112+
ArrayList<Operation> createdOperations = new ArrayList<>();
113+
try {
114+
Networks.replaceDirectedNetworkNodes(
115+
network,
116+
createOperationTransformForParallelInstructionNodes(
117+
stageName,
118+
network,
119+
options,
120+
readerFactory,
121+
sinkFactory,
122+
executionContext,
123+
createdOperations));
124+
} catch (RuntimeException exn) {
125+
for (Operation o : createdOperations) {
126+
try {
127+
o.abort();
128+
} catch (Exception exn2) {
129+
exn.addSuppressed(exn2);
130+
}
131+
}
132+
throw exn;
133+
}
113134

114135
// Collect all the operations within the network and attach all the operations as receivers
115136
// to preceding output receivers.
@@ -144,7 +165,8 @@ Function<Node, Node> createOperationTransformForParallelInstructionNodes(
144165
final PipelineOptions options,
145166
final ReaderFactory readerFactory,
146167
final SinkFactory sinkFactory,
147-
final DataflowExecutionContext<?> executionContext) {
168+
final DataflowExecutionContext<?> executionContext,
169+
final List<Operation> createdOperations) {
148170

149171
return new TypeSafeNodeFunction<ParallelInstructionNode>(ParallelInstructionNode.class) {
150172
@Override
@@ -156,27 +178,31 @@ public Node typedApply(ParallelInstructionNode node) {
156178
instruction.getOriginalName(),
157179
instruction.getSystemName(),
158180
instruction.getName());
181+
OperationNode result;
159182
try {
160183
DataflowOperationContext context = executionContext.createOperationContext(nameContext);
161184
if (instruction.getRead() != null) {
162-
return createReadOperation(
163-
network, node, options, readerFactory, executionContext, context);
185+
result =
186+
createReadOperation(
187+
network, node, options, readerFactory, executionContext, context);
164188
} else if (instruction.getWrite() != null) {
165-
return createWriteOperation(node, options, sinkFactory, executionContext, context);
189+
result = createWriteOperation(node, options, sinkFactory, executionContext, context);
166190
} else if (instruction.getParDo() != null) {
167-
return createParDoOperation(network, node, options, executionContext, context);
191+
result = createParDoOperation(network, node, options, executionContext, context);
168192
} else if (instruction.getPartialGroupByKey() != null) {
169-
return createPartialGroupByKeyOperation(
170-
network, node, options, executionContext, context);
193+
result =
194+
createPartialGroupByKeyOperation(network, node, options, executionContext, context);
171195
} else if (instruction.getFlatten() != null) {
172-
return createFlattenOperation(network, node, context);
196+
result = createFlattenOperation(network, node, context);
173197
} else {
174198
throw new IllegalArgumentException(
175199
String.format("Unexpected instruction: %s", instruction));
176200
}
177201
} catch (Exception e) {
178202
throw new RuntimeException(e);
179203
}
204+
createdOperations.add(result.getOperation());
205+
return result;
180206
}
181207
};
182208
}
@@ -328,7 +354,6 @@ public Node typedApply(InstructionOutputNode input) {
328354
Coder<?> coder =
329355
CloudObjects.coderFromCloudObject(CloudObject.fromSpec(cloudOutput.getCodec()));
330356

331-
@SuppressWarnings("unchecked")
332357
ElementCounter outputCounter =
333358
new DataflowOutputCounter(
334359
cloudOutput.getName(),

0 commit comments

Comments
 (0)