From 00baab0e4fc6e487614ccb5c4107ea648e7e128a Mon Sep 17 00:00:00 2001 From: Nelesh Singla <117123879+nsingla@users.noreply.github.com> Date: Tue, 2 Dec 2025 14:23:20 -0500 Subject: [PATCH] adding support for dsl.condition and dsl.parallelFor to docker runner Signed-off-by: Nelesh Singla <117123879+nsingla@users.noreply.github.com> --- .github/workflows/kfp-sdk-tests.yml | 5 +- .github/workflows/kfp-sdk-unit-tests.yml | 2 +- sdk/python/kfp/local/dag_orchestrator.py | 347 ------- sdk/python/kfp/local/docker_task_handler.py | 22 +- .../kfp/local/docker_task_handler_test.py | 6 +- sdk/python/kfp/local/orchestrator/__init__.py | 13 + .../local/orchestrator/dag_orchestrator.py | 128 +++ .../orchestrator/enhanced_dag_orchestrator.py | 944 ++++++++++++++++++ .../local/orchestrator/orchestrator_utils.py | 347 +++++++ sdk/python/kfp/local/pipeline_orchestrator.py | 44 +- .../kfp/local/pipeline_orchestrator_test.py | 39 +- .../local_execution/local_execution_test.py | 34 +- .../pipeline_with_parallelfor_parallelism.py | 2 +- 13 files changed, 1548 insertions(+), 385 deletions(-) delete mode 100644 sdk/python/kfp/local/dag_orchestrator.py create mode 100755 sdk/python/kfp/local/orchestrator/__init__.py create mode 100644 sdk/python/kfp/local/orchestrator/dag_orchestrator.py create mode 100644 sdk/python/kfp/local/orchestrator/enhanced_dag_orchestrator.py create mode 100644 sdk/python/kfp/local/orchestrator/orchestrator_utils.py diff --git a/.github/workflows/kfp-sdk-tests.yml b/.github/workflows/kfp-sdk-tests.yml index 1cc79624d5c..133e16ce79a 100644 --- a/.github/workflows/kfp-sdk-tests.yml +++ b/.github/workflows/kfp-sdk-tests.yml @@ -20,9 +20,10 @@ concurrency: jobs: sdk-tests: runs-on: ubuntu-latest + timeout-minutes: 40 strategy: matrix: - python-version: ['3.9', '3.13'] + python-version: ['3.11', '3.13'] steps: - name: Checkout code @@ -57,7 +58,7 @@ jobs: sudo systemctl enable docker sudo usermod -aG docker "$USER" # Wait for Docker to be ready - timeout 30 bash -c 'until docker info > /dev/null 2>&1; do sleep 1; done' + timeout 5 bash -c 'until docker info > /dev/null 2>&1; do sleep 1; done' docker info - name: Run SDK Tests diff --git a/.github/workflows/kfp-sdk-unit-tests.yml b/.github/workflows/kfp-sdk-unit-tests.yml index 9f7e3c084c3..18d7f45d3b1 100644 --- a/.github/workflows/kfp-sdk-unit-tests.yml +++ b/.github/workflows/kfp-sdk-unit-tests.yml @@ -21,7 +21,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: ['3.9', '3.13'] + python-version: ['3.11', '3.13'] steps: - name: Checkout code diff --git a/sdk/python/kfp/local/dag_orchestrator.py b/sdk/python/kfp/local/dag_orchestrator.py deleted file mode 100644 index 229705c4e2a..00000000000 --- a/sdk/python/kfp/local/dag_orchestrator.py +++ /dev/null @@ -1,347 +0,0 @@ -# Copyright 2024 The Kubeflow Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Code for locally executing a DAG within a pipeline.""" -import copy -from typing import Any, Dict, List, Tuple - -from kfp.local import config -from kfp.local import graph_utils -from kfp.local import importer_handler -from kfp.local import io -from kfp.local import status -from kfp.pipeline_spec import pipeline_spec_pb2 - -Outputs = Dict[str, Any] - - -def run_dag( - pipeline_resource_name: str, - dag_component_spec: pipeline_spec_pb2.ComponentSpec, - executors: Dict[str, - pipeline_spec_pb2.PipelineDeploymentConfig.ExecutorSpec], - components: Dict[str, pipeline_spec_pb2.ComponentSpec], - dag_arguments: Dict[str, Any], - pipeline_root: str, - runner: config.LocalRunnerType, - unique_pipeline_id: str, - fail_stack: List[str], -) -> Tuple[Outputs, status.Status]: - """Runs a DAGSpec. - - Args: - pipeline_resource_name: The root pipeline resource name. - dag_component_spec: The ComponentSpec which defines the DAG to execute. - executors: The ExecutorSpecs corresponding to the DAG. - components: The ComponentSpecs corresponding to the DAG. - dag_arguments: The arguments to the DAG's outer ComponentSpec. - io_store: The IOStore instance corresponding to this DAG. - pipeline_root: The local pipeline root. - runner: The user-specified local runner. - unique_pipeline_id: A unique identifier for the pipeline for placeholder resolution. - fail_stack: Mutable stack of failures. If a primitive task in the DAG fails, the task name is appended. If a multi-task DAG fails, the DAG name is appended. If the pipeline executes successfully, fail_stack will be empty throughout the full local execution call stack. - - Returns: - A two-tuple of (outputs, status). If status is FAILURE, outputs is an empty dictionary. - """ - from kfp.local import task_dispatcher - - dag_arguments_with_defaults = join_user_inputs_and_defaults( - dag_arguments=dag_arguments, - dag_inputs_spec=dag_component_spec.input_definitions, - ) - - # prepare IOStore for DAG - io_store = io.IOStore() - for k, v in dag_arguments_with_defaults.items(): - io_store.put_parent_input(k, v) - - # execute tasks in order - dag_spec = dag_component_spec.dag - sorted_tasks = graph_utils.topological_sort_tasks(dag_spec.tasks) - while sorted_tasks: - task_name = sorted_tasks.pop() - task_spec = dag_spec.tasks[task_name] - # TODO: support control flow features - validate_task_spec_not_loop_or_condition(task_spec=task_spec) - component_name = task_spec.component_ref.name - component_spec = components[component_name] - implementation = component_spec.WhichOneof('implementation') - if implementation == 'dag': - # unlikely to exceed default max recursion depth of 1000 - outputs, task_status = run_dag( - pipeline_resource_name=pipeline_resource_name, - dag_component_spec=component_spec, - components=components, - executors=executors, - dag_arguments=make_task_arguments( - task_spec.inputs, - io_store, - ), - pipeline_root=pipeline_root, - runner=runner, - unique_pipeline_id=unique_pipeline_id, - fail_stack=fail_stack, - ) - - elif implementation == 'executor_label': - executor_spec = executors[component_spec.executor_label] - task_arguments = make_task_arguments( - task_inputs_spec=dag_spec.tasks[task_name].inputs, - io_store=io_store, - ) - - if executor_spec.WhichOneof('spec') == 'importer': - outputs, task_status = importer_handler.run_importer( - pipeline_resource_name=pipeline_resource_name, - component_name=component_name, - component_spec=component_spec, - executor_spec=executor_spec, - arguments=task_arguments, - pipeline_root=pipeline_root, - unique_pipeline_id=unique_pipeline_id, - ) - elif executor_spec.WhichOneof('spec') == 'container': - outputs, task_status = task_dispatcher.run_single_task_implementation( - pipeline_resource_name=pipeline_resource_name, - component_name=component_name, - component_spec=component_spec, - executor_spec=executor_spec, - arguments=task_arguments, - pipeline_root=pipeline_root, - runner=runner, - # let the outer pipeline raise the error - raise_on_error=False, - # components may consume input artifacts when passed from upstream - # outputs or parent component inputs - block_input_artifact=False, - # provide the same unique job id for each task for - # consistent placeholder resolution - unique_pipeline_id=unique_pipeline_id, - ) - else: - raise ValueError( - "Got unknown spec in ExecutorSpec. Only 'dsl.component', 'dsl.container_component', and 'dsl.importer' are supported in local pipeline execution." - ) - else: - raise ValueError( - f'Got unknown component implementation: {implementation}') - - if task_status == status.Status.FAILURE: - fail_stack.append(task_name) - return {}, status.Status.FAILURE - - # update IO store on success - elif task_status == status.Status.SUCCESS: - for key, output in outputs.items(): - io_store.put_task_output( - task_name, - key, - output, - ) - else: - raise ValueError(f'Got unknown task status: {task_status.name}') - - dag_outputs = get_dag_outputs( - dag_outputs_spec=dag_component_spec.dag.outputs, - io_store=io_store, - ) - return dag_outputs, status.Status.SUCCESS - - -def join_user_inputs_and_defaults( - dag_arguments: Dict[str, Any], - dag_inputs_spec: pipeline_spec_pb2.ComponentInputsSpec, -) -> Dict[str, Any]: - """Collects user-provided arguments and default arguments (when no user- - provided argument) into a dictionary. Returns the dictionary. - - Args: - dag_arguments: The user-provided arguments to the DAG. - dag_inputs_spec: The ComponentInputSpec for the DAG. - - Returns: - The complete DAG inputs, with defaults included where the user-provided argument is missing. - """ - from kfp.local import executor_output_utils - - copied_dag_arguments = copy.deepcopy(dag_arguments) - - for input_name, input_spec in dag_inputs_spec.parameters.items(): - if input_name not in copied_dag_arguments: - copied_dag_arguments[ - input_name] = executor_output_utils.pb2_value_to_python( - input_spec.default_value) - return copied_dag_arguments - - -def make_task_arguments( - task_inputs_spec: pipeline_spec_pb2.TaskInputsSpec, - io_store: io.IOStore, -) -> Dict[str, Any]: - """Obtains a dictionary of arguments required to execute the task - corresponding to TaskInputsSpec. - - Args: - task_inputs_spec: The TaskInputsSpec for the task. - io_store: The IOStore of the current DAG. Used to obtain task arguments which come from upstream task outputs and parent component inputs. - - Returns: - The arguments for the task. - """ - from kfp.local import executor_output_utils - - task_arguments = {} - # handle parameters - for input_name, input_spec in task_inputs_spec.parameters.items(): - - # handle constants - if input_spec.HasField('runtime_value'): - # runtime_value's value should always be constant for the v2 compiler - if input_spec.runtime_value.WhichOneof('value') != 'constant': - raise ValueError('Expected constant.') - task_arguments[ - input_name] = executor_output_utils.pb2_value_to_python( - input_spec.runtime_value.constant) - - # handle upstream outputs - elif input_spec.HasField('task_output_parameter'): - task_arguments[input_name] = io_store.get_task_output( - input_spec.task_output_parameter.producer_task, - input_spec.task_output_parameter.output_parameter_key, - ) - - # handle parent pipeline input parameters - elif input_spec.HasField('component_input_parameter'): - task_arguments[input_name] = io_store.get_parent_input( - input_spec.component_input_parameter) - - # TODO: support dsl.ExitHandler - elif input_spec.HasField('task_final_status'): - raise NotImplementedError( - "'dsl.ExitHandler' is not yet support for local execution.") - - else: - raise ValueError(f'Missing input for parameter {input_name}.') - - # handle artifacts - for input_name, input_spec in task_inputs_spec.artifacts.items(): - if input_spec.HasField('task_output_artifact'): - task_arguments[input_name] = io_store.get_task_output( - input_spec.task_output_artifact.producer_task, - input_spec.task_output_artifact.output_artifact_key, - ) - elif input_spec.HasField('component_input_artifact'): - task_arguments[input_name] = io_store.get_parent_input( - input_spec.component_input_artifact) - else: - raise ValueError(f'Missing input for artifact {input_name}.') - - return task_arguments - - -def get_dag_output_parameters( - dag_outputs_spec: pipeline_spec_pb2.DagOutputsSpec, - io_store: io.IOStore, -) -> Dict[str, Any]: - """Gets the DAG output parameter values from a DagOutputsSpec and the DAG's - IOStore. - - Args: - dag_outputs_spec: DagOutputsSpec corresponding to the DAG. - io_store: IOStore corresponding to the DAG. - - Returns: - The DAG output parameters. - """ - outputs = {} - for root_output_key, parameter_selector_spec in dag_outputs_spec.parameters.items( - ): - kind = parameter_selector_spec.WhichOneof('kind') - if kind == 'value_from_parameter': - value_from_parameter = parameter_selector_spec.value_from_parameter - outputs[root_output_key] = io_store.get_task_output( - value_from_parameter.producer_subtask, - value_from_parameter.output_parameter_key, - ) - elif kind == 'value_from_oneof': - raise NotImplementedError( - "'dsl.OneOf' is not yet supported in local execution.") - else: - raise ValueError( - f"Got unknown 'parameter_selector_spec' kind: {kind}") - return outputs - - -def get_dag_output_artifacts( - dag_outputs_spec: pipeline_spec_pb2.DagOutputsSpec, - io_store: io.IOStore, -) -> Dict[str, Any]: - """Gets the DAG output artifact values from a DagOutputsSpec and the DAG's - IOStore. - - Args: - dag_outputs_spec: DagOutputsSpec corresponding to the DAG. - io_store: IOStore corresponding to the DAG. - - Returns: - The DAG output artifacts. - """ - outputs = {} - for root_output_key, artifact_selector_spec in dag_outputs_spec.artifacts.items( - ): - len_artifact_selectors = len(artifact_selector_spec.artifact_selectors) - if len_artifact_selectors != 1: - raise ValueError( - f'Expected 1 artifact in ArtifactSelectorSpec. Got: {len_artifact_selectors}' - ) - artifact_selector = artifact_selector_spec.artifact_selectors[0] - outputs[root_output_key] = io_store.get_task_output( - artifact_selector.producer_subtask, - artifact_selector.output_artifact_key, - ) - return outputs - - -def get_dag_outputs( - dag_outputs_spec: pipeline_spec_pb2.DagOutputsSpec, - io_store: io.IOStore, -) -> Dict[str, Any]: - """Gets the DAG output values from a DagOutputsSpec and the DAG's IOStore. - - Args: - dag_outputs_spec: DagOutputsSpec corresponding to the DAG. - io_store: IOStore corresponding to the DAG. - - Returns: - The DAG outputs. - """ - output_params = get_dag_output_parameters( - dag_outputs_spec=dag_outputs_spec, - io_store=io_store, - ) - output_artifacts = get_dag_output_artifacts( - dag_outputs_spec=dag_outputs_spec, - io_store=io_store, - ) - return {**output_params, **output_artifacts} - - -def validate_task_spec_not_loop_or_condition( - task_spec: pipeline_spec_pb2.PipelineTaskSpec) -> None: - if task_spec.trigger_policy.condition: - raise NotImplementedError( - "'dsl.Condition' is not supported by local pipeline execution.") - elif task_spec.WhichOneof('iterator'): - raise NotImplementedError( - "'dsl.ParallelFor' is not supported by local pipeline execution.") diff --git a/sdk/python/kfp/local/docker_task_handler.py b/sdk/python/kfp/local/docker_task_handler.py index 39b5f25e323..db0270ccc9a 100755 --- a/sdk/python/kfp/local/docker_task_handler.py +++ b/sdk/python/kfp/local/docker_task_handler.py @@ -117,9 +117,21 @@ def run_docker_container(client: 'docker.DockerClient', image: str, stdout=True, stderr=True, volumes=volumes, + auto_remove=True, **container_run_args) - for line in container.logs(stream=True): - # the inner logs should already have trailing \n - # we do not need to add another - print(line.decode(), end='') - return container.wait()['StatusCode'] + try: + for line in container.logs(stream=True): + # the inner logs should already have trailing \n + # we do not need to add another + print(line.decode(), end='') + except docker.errors.NotFound: + # Container was auto-removed before we could stream logs + # This can happen if the container exits very quickly + pass + try: + return container.wait()['StatusCode'] + except docker.errors.NotFound: + # Container was auto-removed after logs completed + # This means the container exited successfully (logs streaming completed) + # We assume success since we couldn't get the actual status code + return 0 diff --git a/sdk/python/kfp/local/docker_task_handler_test.py b/sdk/python/kfp/local/docker_task_handler_test.py index 7a08f062ce7..e7954684397 100755 --- a/sdk/python/kfp/local/docker_task_handler_test.py +++ b/sdk/python/kfp/local/docker_task_handler_test.py @@ -66,6 +66,7 @@ def test_no_volumes(self): stdout=True, stderr=True, volumes={}, + auto_remove=True, ) def test_cwd_volume(self): @@ -88,7 +89,9 @@ def test_cwd_volume(self): volumes={current_test_dir: { 'bind': '/localdir', 'mode': 'ro' - }}) + }}, + auto_remove=True, + ) class TestDockerTaskHandler(DockerMockTestCase): @@ -236,6 +239,7 @@ def test_run(self): 'mode': 'rw' } }, + auto_remove=True, ) def test_pipeline_root_relpath(self): diff --git a/sdk/python/kfp/local/orchestrator/__init__.py b/sdk/python/kfp/local/orchestrator/__init__.py new file mode 100755 index 00000000000..781c622799f --- /dev/null +++ b/sdk/python/kfp/local/orchestrator/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2023 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/sdk/python/kfp/local/orchestrator/dag_orchestrator.py b/sdk/python/kfp/local/orchestrator/dag_orchestrator.py new file mode 100644 index 00000000000..b4f525bc0da --- /dev/null +++ b/sdk/python/kfp/local/orchestrator/dag_orchestrator.py @@ -0,0 +1,128 @@ +# Copyright 2024 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Code for locally executing a DAG within a pipeline.""" +from typing import Any, Dict, List, Tuple + +from kfp.local import config +from kfp.local import graph_utils +from kfp.local import io +from kfp.local import status +from kfp.pipeline_spec import pipeline_spec_pb2 + +Outputs = Dict[str, Any] + + +def run_dag( + pipeline_resource_name: str, + dag_component_spec: pipeline_spec_pb2.ComponentSpec, + executors: Dict[str, + pipeline_spec_pb2.PipelineDeploymentConfig.ExecutorSpec], + components: Dict[str, pipeline_spec_pb2.ComponentSpec], + dag_arguments: Dict[str, Any], + pipeline_root: str, + runner: config.LocalRunnerType, + unique_pipeline_id: str, + fail_stack: List[str], +) -> Tuple[Outputs, status.Status]: + """Runs a DAGSpec. + + Args: + pipeline_resource_name: The root pipeline resource name. + dag_component_spec: The ComponentSpec which defines the DAG to execute. + executors: The ExecutorSpecs corresponding to the DAG. + components: The ComponentSpecs corresponding to the DAG. + dag_arguments: The arguments to the DAG's outer ComponentSpec. + io_store: The IOStore instance corresponding to this DAG. + pipeline_root: The local pipeline root. + runner: The user-specified local runner. + unique_pipeline_id: A unique identifier for the pipeline for placeholder resolution. + fail_stack: Mutable stack of failures. If a primitive task in the DAG fails, the task name is appended. If a multitask DAG fails, the DAG name is appended. If the pipeline executes successfully, fail_stack will be empty throughout the full local execution call stack. + + Returns: + A two-tuple of (outputs, status). If status is FAILURE, outputs is an empty dictionary. + """ + + # Original DAG execution logic for simple pipelines + from .orchestrator_utils import OrchestratorUtils + + dag_arguments_with_defaults = OrchestratorUtils.join_user_inputs_and_defaults( + dag_arguments=dag_arguments, + dag_inputs_spec=dag_component_spec.input_definitions, + ) + + # prepare IOStore for DAG + io_store = io.IOStore() + for k, v in dag_arguments_with_defaults.items(): + io_store.put_parent_input(k, v) + + # execute tasks in order + dag_spec = dag_component_spec.dag + sorted_tasks = graph_utils.topological_sort_tasks(dag_spec.tasks) + while sorted_tasks: + task_name = sorted_tasks.pop() + task_spec = dag_spec.tasks[task_name] + component_name = task_spec.component_ref.name + component_spec = components[component_name] + implementation = component_spec.WhichOneof('implementation') + if implementation == 'dag': + # unlikely to exceed default max recursion depth of 1000 + outputs, task_status = run_dag( + pipeline_resource_name=pipeline_resource_name, + dag_component_spec=component_spec, + components=components, + executors=executors, + dag_arguments=OrchestratorUtils.make_task_arguments( + task_spec.inputs, + io_store, + ), + pipeline_root=pipeline_root, + runner=runner, + unique_pipeline_id=unique_pipeline_id, + fail_stack=fail_stack, + ) + else: + # Use consolidated task execution logic from OrchestratorUtils + outputs, task_status = OrchestratorUtils.execute_single_task( + task_name=task_name, + task_spec=task_spec, + pipeline_resource_name=pipeline_resource_name, + components=components, + executors=executors, + io_store=io_store, + pipeline_root=pipeline_root, + runner=runner, + unique_pipeline_id=unique_pipeline_id, + fail_stack=fail_stack, + ) + + if task_status == status.Status.FAILURE: + fail_stack.append(task_name) + return {}, status.Status.FAILURE + + # update IO store on success + elif task_status == status.Status.SUCCESS: + for key, output in outputs.items(): + io_store.put_task_output( + task_name, + key, + output, + ) + else: + raise ValueError(f'Got unknown task status: {task_status.name}') + + dag_outputs = OrchestratorUtils.get_dag_outputs( + dag_outputs_spec=dag_component_spec.dag.outputs, + io_store=io_store, + ) + return dag_outputs, status.Status.SUCCESS \ No newline at end of file diff --git a/sdk/python/kfp/local/orchestrator/enhanced_dag_orchestrator.py b/sdk/python/kfp/local/orchestrator/enhanced_dag_orchestrator.py new file mode 100644 index 00000000000..9cca6396786 --- /dev/null +++ b/sdk/python/kfp/local/orchestrator/enhanced_dag_orchestrator.py @@ -0,0 +1,944 @@ +#!/usr/bin/env python3 +# Copyright 2024 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Enhanced DAG orchestrator with support for dsl.Condition and +dsl.ParallelFor.""" + +import concurrent.futures +import logging +import re +from typing import Any, Dict, List, Tuple + +from kfp.local import config +from kfp.local import graph_utils +from kfp.local import io +from kfp.local import status +from kfp.pipeline_spec import pipeline_spec_pb2 + +from .orchestrator_utils import OrchestratorUtils + +Outputs = Dict[str, Any] + + +class ConditionEvaluator: + """Evaluates condition expressions for dsl.Condition support.""" + + @staticmethod + def _extract_pipeline_channels(condition: str) -> List[str]: + """Extract pipeline channel references from condition string. + + Args: + condition: The condition expression + + Returns: + List of pipeline channel references found + """ + # Look for patterns like pipelinechannel--task-name-output-name + pattern = r'pipelinechannel--[a-zA-Z0-9_\-]+' + return re.findall(pattern, condition) + + @staticmethod + def _resolve_pipeline_channel(channel_ref: str, + io_store: io.IOStore) -> Any: + """Resolve a pipeline channel reference to its actual value. + + Args: + channel_ref: The pipeline channel reference string + io_store: IOStore containing values + + Returns: + The resolved value + """ + # Remove the pipelinechannel-- prefix + actual_ref = channel_ref.replace('pipelinechannel--', '') + + # Check if this is a task output reference + if '-Output' in actual_ref or '--' in actual_ref: + # Parse task output reference: task-name-Output or task-name--output-name + if '--' in actual_ref: + parts = actual_ref.split('--') + output_task_name = parts[0] + output_key = parts[1] if len(parts) > 1 else 'Output' + else: + parts = actual_ref.split('-Output') + output_task_name = parts[0] + output_key = 'Output' + + try: + return io_store.get_task_output(output_task_name, output_key) + except Exception: + logging.warning( + f'Could not resolve task output: {output_task_name}.{output_key}' + ) + return None + else: + # This is a parent input parameter + try: + return io_store.get_parent_input(actual_ref) + except Exception: + logging.warning(f'Could not resolve parent input: {actual_ref}') + return None + + @staticmethod + def evaluate_condition( + condition: str, + io_store: io.IOStore, + ) -> bool: + """Evaluates a condition string using available values from IOStore. + + Args: + condition: The condition expression to evaluate + io_store: IOStore containing available values + + Returns: + True if condition evaluates to True, False otherwise + """ + if not condition or not condition.strip(): + return True + + try: + # Create a safe evaluation environment + safe_condition = condition + + # Extract and resolve pipeline channel references + pipeline_channels = ConditionEvaluator._extract_pipeline_channels( + condition) + + for channel_ref in pipeline_channels: + value = ConditionEvaluator._resolve_pipeline_channel( + channel_ref, io_store) + if value is not None: + # Replace the reference with the actual value + # Convert value to string representation for substitution + if isinstance(value, str): + safe_condition = safe_condition.replace( + channel_ref, f"'{value}'") + else: + safe_condition = safe_condition.replace( + channel_ref, str(value)) + else: + logging.warning( + f'Could not resolve channel reference: {channel_ref}') + return False + + # Use a restricted evaluation environment for safety + allowed_names = { + '__builtins__': {}, + 'True': True, + 'False': False, + 'None': None, + 'int': int, + 'float': float, + 'str': str, + 'bool': bool, + 'len': len, + 'min': min, + 'max': max, + } + + # Add basic comparison and logical operators by evaluating in restricted context + result = eval(safe_condition, allowed_names, {}) + return bool(result) + + except Exception as e: + logging.warning( + f'Condition evaluation failed for "{condition}": {e}') + return False + + +class ParallelExecutor: + """Handles parallel execution of tasks for dsl.ParallelFor support.""" + + def __init__(self, max_workers: int = None): + """Initialize parallel executor. + + Args: + max_workers: Maximum number of parallel workers. If None, uses conservative default. + """ + # Use conservative default to avoid thread explosion with nested ParallelFor loops + # Conservative max of 2 prevents exponential thread growth: + # - 2 levels deep: 2 × 2 = 4 threads + # - 3 levels deep: 2 × 2 × 2 = 8 threads + self.conservative_max = 2 + # If max_workers not specified (None), use conservative default + # If specified, still cap it to prevent thread explosion in nested loops + if max_workers is None: + self.max_workers = self.conservative_max + else: + # Cap user-specified max_workers at conservative maximum + self.max_workers = min(max_workers, self.conservative_max) + + def execute_parallel_tasks( + self, + tasks: List[Tuple[str, pipeline_spec_pb2.PipelineTaskSpec]], + pipeline_resource_name: str, + components: Dict[str, pipeline_spec_pb2.ComponentSpec], + executors: Dict[ + str, pipeline_spec_pb2.PipelineDeploymentConfig.ExecutorSpec], + io_store: io.IOStore, + pipeline_root: str, + runner: config.LocalRunnerType, + unique_pipeline_id: str, + fail_stack: List[str], + parallelism_limit: int = 0, + ) -> Tuple[Dict[str, Outputs], status.Status]: + """Execute tasks in parallel with optional parallelism limit. + + Args: + tasks: List of (task_name, task_spec) tuples to execute + pipeline_resource_name: The root pipeline resource name + components: Component specifications + executors: Executor specifications + io_store: IOStore for this execution context + pipeline_root: Local pipeline root directory + runner: Local runner configuration + unique_pipeline_id: Unique pipeline identifier + fail_stack: Mutable failure stack + parallelism_limit: Maximum parallel executions (0 = unlimited) + + Returns: + Tuple of (all_outputs, overall_status) + """ + # Determine max_workers: use parallelism_limit if specified, otherwise use default + requested_workers = parallelism_limit if parallelism_limit > 0 else self.max_workers + # Always cap at conservative maximum to prevent thread explosion in nested loops + max_workers = min(requested_workers, self.conservative_max) + all_outputs = {} + + executor = concurrent.futures.ThreadPoolExecutor( + max_workers=max_workers) + try: + # Submit all tasks + future_to_task = {} + for task_name, task_spec in tasks: + future = executor.submit( + execute_task, + task_name=task_name, + task_spec=task_spec, + pipeline_resource_name=pipeline_resource_name, + components=components, + executors=executors, + io_store=io_store, + pipeline_root=pipeline_root, + runner=runner, + unique_pipeline_id=unique_pipeline_id, + fail_stack=fail_stack, + ) + future_to_task[future] = task_name + + # Collect results + for future in concurrent.futures.as_completed(future_to_task): + task_name = future_to_task[future] + try: + outputs, task_status = future.result() + + if task_status == status.Status.FAILURE: + fail_stack.append(task_name) + return {}, status.Status.FAILURE + + all_outputs[task_name] = outputs + + # Update IO store with outputs + for key, output in outputs.items(): + io_store.put_task_output(task_name, key, output) + + except Exception as e: + logging.error( + f'Task {task_name} failed with exception: {e}') + fail_stack.append(task_name) + return {}, status.Status.FAILURE + + return all_outputs, status.Status.SUCCESS + finally: + # Explicitly shutdown the executor and wait for threads to terminate + executor.shutdown(wait=True) + + +def _has_valid_iterator(task_spec: pipeline_spec_pb2.PipelineTaskSpec) -> bool: + """Check if a task spec has a valid iterator configuration for ParallelFor. + + Args: + task_spec: The pipeline task specification + + Returns: + True if this is a valid ParallelFor task, False otherwise + """ + iterator_type = task_spec.WhichOneof('iterator') + if not iterator_type: + return False + + if iterator_type == 'parameter_iterator': + param_iter = task_spec.parameter_iterator + items_spec = param_iter.items + # Check if the iterator has valid items configuration + return (items_spec.HasField('input_parameter') or + items_spec.HasField('raw')) + elif iterator_type == 'artifact_iterator': + artifact_iter = task_spec.artifact_iterator + items_spec = artifact_iter.items + # Check if the iterator has valid items configuration + return bool(items_spec.input_artifact) + + return False + + +def _get_artifact_iterator_items( + task_spec: pipeline_spec_pb2.PipelineTaskSpec, + io_store: io.IOStore, + task_name: str, +) -> Tuple[List[Any], str, int]: + """Get items from an artifact iterator. + + Args: + task_spec: The pipeline task specification with artifact_iterator + io_store: IOStore containing values + task_name: The task name for logging + + Returns: + Tuple of (items list, artifact_item_input name, parallelism_limit) + + Raises: + ValueError: If items cannot be resolved + """ + artifact_iter = task_spec.artifact_iterator + items_spec = artifact_iter.items + artifact_item_input = artifact_iter.item_input + + if not items_spec.input_artifact: + raise ValueError(f'Unknown artifact items type for task {task_name}') + + # Get artifacts from input artifact (from IOStore) + artifact_name = items_spec.input_artifact + + items = None + # Check if this is a task output artifact or a parent input artifact + if '-' in artifact_name: + # Try to parse as task-output format + # Format could be: task-name-output-key or pipelinechannel--task-name-output-key + actual_name = artifact_name.replace('pipelinechannel--', '') + # Try to find the producer task and output key + # The format is typically: producer-task-output-key + parts = actual_name.rsplit('-', 1) + if len(parts) == 2: + producer_task = parts[0] + output_key = parts[1] + try: + items = io_store.get_task_output(producer_task, output_key) + except ValueError: + # Try with full name as parent input + try: + items = io_store.get_parent_input(artifact_name) + except ValueError: + items = io_store.get_parent_input(actual_name) + else: + items = io_store.get_parent_input(artifact_name) + else: + items = io_store.get_parent_input(artifact_name) + + # Get parallelism limit from iterator_policy if available + parallelism_limit = 0 + if task_spec.HasField('iterator_policy'): + parallelism_limit = task_spec.iterator_policy.parallelism_limit + + return items, artifact_item_input, parallelism_limit + + +def _evaluate_parameter_expression(loop_item: Any, expression: str) -> Any: + """Evaluate a parameter expression selector against a loop item. + + Args: + loop_item: The current loop item value + expression: The parameter expression selector (e.g., 'parseJson(string_value)["A_a"]') + + Returns: + The extracted value from the loop item + """ + try: + # Handle parseJson expressions + if expression.startswith('parseJson('): + # Extract the field path from the expression + # Expression format: parseJson(string_value)["field_name"] + if '["' in expression and '"]' in expression: + start_idx = expression.find('["') + 2 + end_idx = expression.find('"]') + field_name = expression[start_idx:end_idx] + + # If loop_item is already a dict, extract the field directly + if isinstance(loop_item, dict): + if field_name in loop_item: + return loop_item[field_name] + else: + raise KeyError( + f'Field "{field_name}" not found in loop item: {loop_item}' + ) + # If loop_item is a JSON string, parse it first + elif isinstance(loop_item, str): + import json + parsed_item = json.loads(loop_item) + if field_name in parsed_item: + return parsed_item[field_name] + else: + raise KeyError( + f'Field "{field_name}" not found in parsed loop item: {parsed_item}' + ) + else: + logging.warning( + f'Cannot parse JSON from loop item type: {type(loop_item)}' + ) + return loop_item + else: + # No field extraction, just parse JSON + if isinstance(loop_item, str): + import json + return json.loads(loop_item) + else: + return loop_item + + # Handle other expressions (can be extended as needed) + elif '[' in expression and ']' in expression: + # Handle direct field access like item["field"] + start_idx = expression.find('["') + 2 + end_idx = expression.find('"]') + if start_idx > 1 and end_idx > start_idx: + field_name = expression[start_idx:end_idx] + if isinstance(loop_item, dict): + if field_name in loop_item: + return loop_item[field_name] + else: + raise KeyError( + f'Field "{field_name}" not found in loop item: {loop_item}' + ) + + # Default: return the loop item as-is + return loop_item + + except Exception as e: + logging.warning( + f'Failed to evaluate parameter expression "{expression}": {e}') + return loop_item + + +def _create_loop_iteration_task_spec( + original_task_spec: pipeline_spec_pb2.PipelineTaskSpec, + loop_item: Any, + iteration_index: int, + loop_task_name: str, + components: Dict[str, pipeline_spec_pb2.ComponentSpec] = None, +) -> pipeline_spec_pb2.PipelineTaskSpec: + """Create a modified task spec for a loop iteration. + + Args: + original_task_spec: The original task specification + loop_item: The current loop item value + iteration_index: The iteration index + loop_task_name: The original loop task name + components: Component specifications (needed to get loop item parameter name) + + Returns: + Modified task spec for this iteration + """ + from kfp.compiler import pipeline_spec_builder + from kfp.local import executor_output_utils + + # Create a proper copy of the original task spec using protobuf methods + # (deepcopy doesn't work correctly with protobuf objects) + iteration_task_spec = pipeline_spec_pb2.PipelineTaskSpec() + iteration_task_spec.CopyFrom(original_task_spec) + + # Get the loop item parameter name from the iterator + iterator_type = original_task_spec.WhichOneof('iterator') + loop_item_param_name = None + if iterator_type == 'parameter_iterator': + param_iter = original_task_spec.parameter_iterator + loop_item_param_name = param_iter.item_input + + # The nested DAG component's input definition expects the loop-item parameter name, + # so we ALWAYS need to add it with the correct name. This is required because the + # component's input definitions use the -loop-item suffix while the task spec + # might use a different parameter name. + if loop_item_param_name: + # Create the input parameter for the loop item with the correct name + new_input = iteration_task_spec.inputs.parameters[loop_item_param_name] + new_constant = pipeline_spec_builder.to_protobuf_value(loop_item) + new_input.runtime_value.constant.CopyFrom(new_constant) + + # Remove the iterator since this is now a regular task + iteration_task_spec.ClearField('parameter_iterator') + iteration_task_spec.ClearField('artifact_iterator') + + # Find and replace loop item references in task inputs + for input_name, input_spec in iteration_task_spec.inputs.parameters.items(): + + if input_spec.HasField('runtime_value'): + # Check if this references the loop item + runtime_value = input_spec.runtime_value + if runtime_value.WhichOneof('value') == 'constant': + # This is a constant - might need to substitute loop item + constant_value = executor_output_utils.pb2_value_to_python( + runtime_value.constant) + + # Check if the constant value contains loop item placeholders + if isinstance(constant_value, + str) and 'pipelinechannel--' in constant_value: + # This might be a loop item reference + if f'{loop_task_name}-loop-item' in constant_value: + # Replace with actual loop item value + new_constant = pipeline_spec_builder.to_protobuf_value( + loop_item) + runtime_value.constant.CopyFrom(new_constant) + + elif input_spec.HasField('task_output_parameter'): + # This is a reference to an upstream task output + # In ParallelFor, we need to replace this with the loop item value + input_spec.ClearField('task_output_parameter') + new_constant = pipeline_spec_builder.to_protobuf_value(loop_item) + input_spec.runtime_value.constant.CopyFrom(new_constant) + + elif input_spec.HasField('component_input_parameter'): + # Check if this references a loop item parameter + param_name = input_spec.component_input_parameter + + # Check if parameter expression selector exists (scalar field - check value not presence) + has_expression_selector = bool( + input_spec.parameter_expression_selector) + + if param_name.endswith( + '-loop-item' + ) or f'{loop_task_name}-loop-item' in param_name: + # Handle parameter expression selector if present + if has_expression_selector: + # Extract the specific value using the expression selector + expression = input_spec.parameter_expression_selector + extracted_value = _evaluate_parameter_expression( + loop_item, expression) + + # Replace with the extracted value + input_spec.ClearField('component_input_parameter') + input_spec.ClearField('parameter_expression_selector') + if extracted_value is None: + raise ValueError("Extracted value is None") + new_constant = pipeline_spec_builder.to_protobuf_value( + extracted_value) + input_spec.runtime_value.constant.CopyFrom(new_constant) + else: + # Replace with the entire loop item value + input_spec.ClearField('component_input_parameter') + if loop_item is None: + raise ValueError("Loop item is None") + new_constant = pipeline_spec_builder.to_protobuf_value( + loop_item) + input_spec.runtime_value.constant.CopyFrom(new_constant) + else: + # Only replace if this parameter matches the specific loop item parameter name + # from this ParallelFor's iterator. This avoids incorrectly replacing other + # parent pipeline inputs that nested loops may need to access. + if loop_item_param_name and param_name == loop_item_param_name: + input_spec.ClearField('component_input_parameter') + new_constant = pipeline_spec_builder.to_protobuf_value( + loop_item) + input_spec.runtime_value.constant.CopyFrom(new_constant) + + return iteration_task_spec + + +def _create_artifact_loop_iteration_task_spec( + original_task_spec: pipeline_spec_pb2.PipelineTaskSpec, + loop_item: Any, + iteration_index: int, + loop_task_name: str, + artifact_item_input: str, +) -> pipeline_spec_pb2.PipelineTaskSpec: + """Create a modified task spec for an artifact iterator loop iteration. + + Args: + original_task_spec: The original task specification + loop_item: The current loop item (an artifact) + iteration_index: The iteration index + loop_task_name: The original loop task name + artifact_item_input: The name of the artifact input for the loop item + + Returns: + Modified task spec for this iteration + """ + # Create a proper copy of the original task spec using protobuf methods + iteration_task_spec = pipeline_spec_pb2.PipelineTaskSpec() + iteration_task_spec.CopyFrom(original_task_spec) + + # Remove the iterator since this is now a regular task + iteration_task_spec.ClearField('parameter_iterator') + iteration_task_spec.ClearField('artifact_iterator') + + # For artifact iterators, we need to handle artifact inputs + # Update artifact input references to use iteration-specific keys + # This allows parallel execution without race conditions + + # Find and update artifact inputs that reference the loop item + for input_name, input_spec in iteration_task_spec.inputs.artifacts.items(): + if input_spec.HasField('component_input_artifact'): + artifact_name = input_spec.component_input_artifact + # Check if this references the loop item artifact + if (artifact_name == artifact_item_input or + artifact_name.endswith('-loop-item') or + f'{loop_task_name}-loop-item' in artifact_name): + # Update the reference to use an iteration-specific key + # This allows each iteration to have its own artifact in the shared io_store + iteration_key = f'{artifact_name}-iteration-{iteration_index}' + input_spec.component_input_artifact = iteration_key + + return iteration_task_spec + + +def run_enhanced_dag( + pipeline_resource_name: str, + dag_component_spec: pipeline_spec_pb2.ComponentSpec, + executors: Dict[str, + pipeline_spec_pb2.PipelineDeploymentConfig.ExecutorSpec], + components: Dict[str, pipeline_spec_pb2.ComponentSpec], + dag_arguments: Dict[str, Any], + pipeline_root: str, + runner: config.LocalRunnerType, + unique_pipeline_id: str, + fail_stack: List[str], +) -> Tuple[Outputs, status.Status]: + """Enhanced DAG runner with support for dsl.Condition and dsl.ParallelFor. + + This is an enhanced version of dag_orchestrator.run_dag that + supports control flow features like conditions and parallel loops. + """ + dag_arguments_with_defaults = OrchestratorUtils.join_user_inputs_and_defaults( + dag_arguments=dag_arguments, + dag_inputs_spec=dag_component_spec.input_definitions, + ) + + # prepare IOStore for DAG + io_store = io.IOStore() + for k, v in dag_arguments_with_defaults.items(): + io_store.put_parent_input(k, v) + + dag_spec = dag_component_spec.dag + + # Group tasks by control flow type + regular_tasks = [] + condition_tasks = [] + parallel_for_tasks = [] + + for task_name, task_spec in dag_spec.tasks.items(): + if task_spec.trigger_policy.condition: + condition_tasks.append((task_name, task_spec)) + elif task_spec.WhichOneof('iterator') and _has_valid_iterator( + task_spec): + parallel_for_tasks.append((task_name, task_spec)) + else: + regular_tasks.append((task_name, task_spec)) + + # Execute regular tasks first (topologically sorted) + regular_task_dict = {name: spec for name, spec in regular_tasks} + if regular_task_dict: + sorted_task_names = graph_utils.topological_sort_tasks( + regular_task_dict) + + while sorted_task_names: + task_name = sorted_task_names.pop() + task_spec = regular_task_dict[task_name] + + # Use execute_task which handles both executor and nested DAG implementations + outputs, task_status = execute_task( + task_name=task_name, + task_spec=task_spec, + pipeline_resource_name=pipeline_resource_name, + components=components, + executors=executors, + io_store=io_store, + pipeline_root=pipeline_root, + runner=runner, + unique_pipeline_id=unique_pipeline_id, + fail_stack=fail_stack, + ) + + if task_status == status.Status.FAILURE: + return {}, status.Status.FAILURE + + # Update IO store on success + for key, output in outputs.items(): + io_store.put_task_output(task_name, key, output) + + # Execute conditional tasks + condition_evaluator = ConditionEvaluator() + for task_name, task_spec in condition_tasks: + # Evaluate condition + condition_expr = task_spec.trigger_policy.condition + should_execute = condition_evaluator.evaluate_condition( + condition_expr, io_store) + + if should_execute: + outputs, task_status = execute_task( + task_name=task_name, + task_spec=task_spec, + pipeline_resource_name=pipeline_resource_name, + components=components, + executors=executors, + io_store=io_store, + pipeline_root=pipeline_root, + runner=runner, + unique_pipeline_id=unique_pipeline_id, + fail_stack=fail_stack, + ) + + if task_status == status.Status.FAILURE: + return {}, status.Status.FAILURE + + # Update IO store on success + for key, output in outputs.items(): + io_store.put_task_output(task_name, key, output) + else: + logging.info( + f'Skipping conditional task {task_name} (condition evaluated to False)' + ) + + # Execute parallel for tasks + parallel_executor = ParallelExecutor() + for task_name, task_spec in parallel_for_tasks: + # Get iterator configuration + iterator = task_spec.WhichOneof('iterator') + if iterator == 'parameter_iterator': + param_iter = task_spec.parameter_iterator + # Get items from the ItemsSpec + items_spec = param_iter.items + if items_spec.HasField('input_parameter'): + # Get items from input parameter (from IOStore) + param_name = items_spec.input_parameter + # Check if this is a task output parameter or a parent input parameter + if param_name.startswith('pipelinechannel--'): + # Remove the prefix but check if it contains a task output reference + actual_param = param_name.replace('pipelinechannel--', + '').replace( + '-loop-item', '') + # Check if this looks like a task output (task-name-output-name) + if '-Output' in actual_param: + # This is a task output, split it + parts = actual_param.split('-Output') + output_task_name = parts[ + 0] # Don't overwrite the loop variable! + output_key = 'Output' + items = io_store.get_task_output( + output_task_name, output_key) + else: + # This is a parent input - try both the stripped name and + # the full pipelinechannel name since IOStore keys may use either + try: + items = io_store.get_parent_input(actual_param) + except Exception: + # Try with the pipelinechannel prefix + items = io_store.get_parent_input( + f'pipelinechannel--{actual_param}') + else: + # Direct parameter name + items = io_store.get_parent_input(param_name) + elif items_spec.HasField('raw'): + # Parse raw JSON string + import json + items = json.loads(items_spec.raw) + else: + logging.warning(f'Unknown items type for task {task_name}') + continue + parallelism_limit = getattr(param_iter, 'parallelism_limit', 0) + elif iterator == 'artifact_iterator': + # Handle artifact iterator using helper function + try: + items, artifact_item_input, parallelism_limit = _get_artifact_iterator_items( + task_spec, io_store, task_name) + except ValueError as e: + logging.warning(f'Failed to get artifact iterator items: {e}') + continue + else: + logging.warning( + f'Unknown iterator type for task {task_name}: {iterator}') + continue + + # Create parallel tasks for each loop item + loop_tasks = [] + for i, item in enumerate(items): + # Create a unique task name for this iteration + iteration_task_name = f"{task_name}-iteration-{i}" + + # Create a modified task spec for this iteration + if iterator == 'artifact_iterator': + iteration_task_spec = _create_artifact_loop_iteration_task_spec( + task_spec, item, i, task_name, artifact_item_input) + # For artifact iterators, store the artifact in io_store with iteration-specific keys + # Use artifact_item_input as base since that's what the nested component expects + # (e.g., pipelinechannel--make-artifacts-Output-loop-item-iteration-0) + iteration_key = f'{artifact_item_input}-iteration-{i}' + io_store.put_parent_input(iteration_key, item) + else: + iteration_task_spec = _create_loop_iteration_task_spec( + task_spec, item, i, task_name) + + loop_tasks.append((iteration_task_name, iteration_task_spec)) + + # Execute all loop iterations in parallel + if loop_tasks: + + parallel_outputs, parallel_status = parallel_executor.execute_parallel_tasks( + tasks=loop_tasks, + pipeline_resource_name=pipeline_resource_name, + components=components, + executors=executors, + io_store=io_store, + pipeline_root=pipeline_root, + runner=runner, + unique_pipeline_id=unique_pipeline_id, + fail_stack=fail_stack, + parallelism_limit=parallelism_limit, + ) + + if parallel_status == status.Status.FAILURE: + return {}, status.Status.FAILURE + + # Aggregate loop outputs - collect all iteration outputs under the main task name + aggregated_outputs = {} + for iteration_name, outputs in parallel_outputs.items(): + # Store outputs with iteration index for later aggregation + iteration_index = iteration_name.split('-iteration-')[-1] + for output_key, output_value in outputs.items(): + if output_key not in aggregated_outputs: + aggregated_outputs[output_key] = {} + aggregated_outputs[output_key][ + iteration_index] = output_value + + # Store the aggregated outputs in the main task's namespace + for output_key, iteration_outputs in aggregated_outputs.items(): + # Create a list of outputs in iteration order + ordered_outputs = [ + iteration_outputs.get(str(i)) for i in range(len(items)) + ] + io_store.put_task_output(task_name, output_key, ordered_outputs) + + # Get DAG outputs + dag_outputs = OrchestratorUtils.get_dag_outputs( + dag_outputs_spec=dag_component_spec.dag.outputs, + io_store=io_store, + ) + + return dag_outputs, status.Status.SUCCESS + + +def execute_task( + task_name: str, + task_spec: pipeline_spec_pb2.PipelineTaskSpec, + pipeline_resource_name: str, + components: Dict[str, pipeline_spec_pb2.ComponentSpec], + executors: Dict[str, + pipeline_spec_pb2.PipelineDeploymentConfig.ExecutorSpec], + io_store: io.IOStore, + pipeline_root: str, + runner: config.LocalRunnerType, + unique_pipeline_id: str, + fail_stack: List[str], +) -> Tuple[Outputs, status.Status]: + """Execute a single task.""" + component_name = task_spec.component_ref.name + component_spec = components[component_name] + implementation = component_spec.WhichOneof('implementation') + + if implementation == 'dag': + # For ParallelFor iteration tasks, we need to handle the case where + # the iteration task spec has no inputs but we need to pass arguments + # to the nested DAG based on the component's input definitions + if not task_spec.inputs.parameters: + # This is likely a ParallelFor iteration with no inputs + # Use the component's input definitions with default values + task_arguments = OrchestratorUtils.join_user_inputs_and_defaults( + dag_arguments={}, + dag_inputs_spec=component_spec.input_definitions, + ) + else: + task_arguments = OrchestratorUtils.make_task_arguments( + task_spec.inputs, io_store) + + # For nested DAGs (especially ParallelFor iterations), we need to ensure that + # any parent inputs required by the component's input definitions are passed through. + # This handles cases like nested ParallelFor loops that reference parent pipeline inputs. + for input_name in component_spec.input_definitions.parameters: + if input_name not in task_arguments: + # This input is needed by the component but not in task_arguments + # Try to get it from the parent IOStore + try: + parent_value = io_store.get_parent_input(input_name) + task_arguments[input_name] = parent_value + except Exception: + # Input not available in parent IOStore, will use default if available + pass + + # Also handle artifact inputs for artifact iterators + # For iteration tasks (e.g., for-loop-1-iteration-0), extract the iteration index + # to look up artifacts stored with iteration-specific keys + iteration_index = None + if '-iteration-' in task_name: + try: + iteration_index = int(task_name.split('-iteration-')[-1]) + except ValueError: + pass + + for input_name in component_spec.input_definitions.artifacts: + if input_name not in task_arguments: + # This artifact is needed by the component but not in task_arguments + # Try to get it from the parent IOStore + try: + # For iteration tasks, try the iteration-specific key first + if iteration_index is not None: + iteration_key = f'{input_name}-iteration-{iteration_index}' + try: + parent_value = io_store.get_parent_input( + iteration_key) + task_arguments[input_name] = parent_value + continue + except Exception: + pass + # Fall back to the regular key + parent_value = io_store.get_parent_input(input_name) + task_arguments[input_name] = parent_value + except Exception: + # Artifact not available in parent IOStore + pass + + # For nested DAGs (especially ParallelFor iterations), include the task name + # in the pipeline_resource_name to ensure unique output paths for each iteration + nested_resource_name = f"{pipeline_resource_name}/{task_name}" + + return run_enhanced_dag( + pipeline_resource_name=nested_resource_name, + dag_component_spec=component_spec, + components=components, + executors=executors, + dag_arguments=task_arguments, + pipeline_root=pipeline_root, + runner=runner, + unique_pipeline_id=unique_pipeline_id, + fail_stack=fail_stack, + ) + + else: + return OrchestratorUtils.execute_single_task( + task_name=task_name, + task_spec=task_spec, + pipeline_resource_name=pipeline_resource_name, + components=components, + executors=executors, + io_store=io_store, + pipeline_root=pipeline_root, + runner=runner, + unique_pipeline_id=unique_pipeline_id, + fail_stack=fail_stack, + ) diff --git a/sdk/python/kfp/local/orchestrator/orchestrator_utils.py b/sdk/python/kfp/local/orchestrator/orchestrator_utils.py new file mode 100644 index 00000000000..4dd5e849ed7 --- /dev/null +++ b/sdk/python/kfp/local/orchestrator/orchestrator_utils.py @@ -0,0 +1,347 @@ +import copy +import logging +from typing import Any, Dict, List, Tuple + +from kfp.local import config +from kfp.local import importer_handler +from kfp.local import io +from kfp.local import status +from kfp.local import task_dispatcher +from kfp.pipeline_spec import pipeline_spec_pb2 + +Outputs = Dict[str, Any] + + +class OrchestratorUtils: + + @classmethod + def _apply_expression_selector(cls, value: Any, expression: str) -> Any: + """Apply a parameter expression selector to extract a field from a + value. + + Args: + value: The value to extract from (typically a dict) + expression: The expression selector (e.g., 'parseJson(string_value)["A_a"]') + + Returns: + The extracted value + """ + import json as json_module + + try: + # Handle parseJson expressions + if expression.startswith('parseJson('): + # Extract the field path from the expression + # Expression format: parseJson(string_value)["field_name"] + if '["' in expression and '"]' in expression: + start_idx = expression.find('["') + 2 + end_idx = expression.find('"]') + field_name = expression[start_idx:end_idx] + + # If value is already a dict, extract the field directly + if isinstance(value, dict): + if field_name in value: + return value[field_name] + else: + logging.warning( + f'Field "{field_name}" not found in value: {value}' + ) + return value + # If value is a JSON string, parse it first + elif isinstance(value, str): + parsed_value = json_module.loads(value) + if field_name in parsed_value: + return parsed_value[field_name] + else: + logging.warning( + f'Field "{field_name}" not found in parsed value: {parsed_value}' + ) + return value + else: + logging.warning( + f'Cannot parse JSON from value type: {type(value)}') + return value + else: + # No field extraction, just parse JSON if string + if isinstance(value, str): + return json_module.loads(value) + else: + return value + + # Handle direct field access like item["field"] + elif '[' in expression and ']' in expression: + start_idx = expression.find('["') + 2 + end_idx = expression.find('"]') + if start_idx > 1 and end_idx > start_idx: + field_name = expression[start_idx:end_idx] + if isinstance(value, dict): + if field_name in value: + return value[field_name] + else: + logging.warning( + f'Field "{field_name}" not found in value: {value}' + ) + + # Default: return the value as-is + return value + + except Exception as e: + logging.warning( + f'Failed to apply expression selector "{expression}": {e}') + return value + + @classmethod + def execute_single_task( + cls, + task_name: str, + task_spec: pipeline_spec_pb2.PipelineTaskSpec, + pipeline_resource_name: str, + components: Dict[str, pipeline_spec_pb2.ComponentSpec], + executors: Dict[ + str, pipeline_spec_pb2.PipelineDeploymentConfig.ExecutorSpec], + io_store: io.IOStore, + pipeline_root: str, + runner: config.LocalRunnerType, + unique_pipeline_id: str, + fail_stack: List[str], + ) -> Tuple[Outputs, status.Status]: + """Execute a single task (used by parallel executor).""" + component_name = task_spec.component_ref.name + component_spec = components[component_name] + implementation = component_spec.WhichOneof('implementation') + + if implementation == 'executor_label': + executor_spec = executors[component_spec.executor_label] + task_arguments = cls.make_task_arguments( + task_inputs_spec=task_spec.inputs, + io_store=io_store, + ) + + if executor_spec.WhichOneof('spec') == 'importer': + return importer_handler.run_importer( + pipeline_resource_name=pipeline_resource_name, + component_name=component_name, + component_spec=component_spec, + executor_spec=executor_spec, + arguments=task_arguments, + pipeline_root=pipeline_root, + unique_pipeline_id=unique_pipeline_id, + ) + elif executor_spec.WhichOneof('spec') == 'container': + return task_dispatcher.run_single_task_implementation( + pipeline_resource_name=pipeline_resource_name, + component_name=component_name, + component_spec=component_spec, + executor_spec=executor_spec, + arguments=task_arguments, + pipeline_root=pipeline_root, + runner=runner, + raise_on_error=False, + block_input_artifact=False, + unique_pipeline_id=unique_pipeline_id, + ) + else: + raise ValueError( + "Got unknown spec in ExecutorSpec. Only 'dsl.component', 'dsl.container_component', and 'dsl.importer' are supported." + ) + else: + raise ValueError( + f'Got unknown component implementation: {implementation}') + + @classmethod + def join_user_inputs_and_defaults( + cls, + dag_arguments: Dict[str, Any], + dag_inputs_spec: pipeline_spec_pb2.ComponentInputsSpec, + ) -> Dict[str, Any]: + """Collects user-provided arguments and default arguments (when no + user- provided argument) into a dictionary. Returns the dictionary. + + Args: + dag_arguments: The user-provided arguments to the DAG. + dag_inputs_spec: The ComponentInputSpec for the DAG. + + Returns: + The complete DAG inputs, with defaults included where the user-provided argument is missing. + """ + from kfp.local import executor_output_utils + + copied_dag_arguments = copy.deepcopy(dag_arguments) + + for input_name, input_spec in dag_inputs_spec.parameters.items(): + if input_name not in copied_dag_arguments: + copied_dag_arguments[ + input_name] = executor_output_utils.pb2_value_to_python( + input_spec.default_value) + return copied_dag_arguments + + @classmethod + def make_task_arguments( + cls, + task_inputs_spec: pipeline_spec_pb2.TaskInputsSpec, + io_store: io.IOStore, + ) -> Dict[str, Any]: + """Obtains a dictionary of arguments required to execute the task + corresponding to TaskInputsSpec. + + Args: + task_inputs_spec: The TaskInputsSpec for the task. + io_store: The IOStore of the current DAG. Used to obtain task arguments which come from upstream task outputs and parent component inputs. + + Returns: + The arguments for the task. + """ + from kfp.local import executor_output_utils + + task_arguments = {} + # handle parameters + for input_name, input_spec in task_inputs_spec.parameters.items(): + + # handle constants + if input_spec.HasField('runtime_value'): + # runtime_value's value should always be constant for the v2 compiler + if input_spec.runtime_value.WhichOneof('value') != 'constant': + raise ValueError('Expected constant.') + task_arguments[ + input_name] = executor_output_utils.pb2_value_to_python( + input_spec.runtime_value.constant) + + # handle upstream outputs + elif input_spec.HasField('task_output_parameter'): + task_arguments[input_name] = io_store.get_task_output( + input_spec.task_output_parameter.producer_task, + input_spec.task_output_parameter.output_parameter_key, + ) + + # handle parent pipeline input parameters + elif input_spec.HasField('component_input_parameter'): + parent_value = io_store.get_parent_input( + input_spec.component_input_parameter) + + # Check if there's a parameter expression selector to apply + # (used in ParallelFor to extract specific fields from structured loop items) + expression_selector = input_spec.parameter_expression_selector + if expression_selector: + logging.info( + f"Applying expression selector '{expression_selector}' to parent value: {parent_value}" + ) + parent_value = cls._apply_expression_selector( + parent_value, expression_selector) + logging.info(f"After expression selector: {parent_value}") + + task_arguments[input_name] = parent_value + + # TODO: support dsl.ExitHandler + elif input_spec.HasField('task_final_status'): + raise NotImplementedError( + "'dsl.ExitHandler' is not yet support for local execution.") + + else: + raise ValueError(f'Missing input for parameter {input_name}.') + + # handle artifacts + for input_name, input_spec in task_inputs_spec.artifacts.items(): + if input_spec.HasField('task_output_artifact'): + task_arguments[input_name] = io_store.get_task_output( + input_spec.task_output_artifact.producer_task, + input_spec.task_output_artifact.output_artifact_key, + ) + elif input_spec.HasField('component_input_artifact'): + task_arguments[input_name] = io_store.get_parent_input( + input_spec.component_input_artifact) + else: + raise ValueError(f'Missing input for artifact {input_name}.') + + return task_arguments + + @classmethod + def get_dag_output_parameters( + cls, + dag_outputs_spec: pipeline_spec_pb2.DagOutputsSpec, + io_store: io.IOStore, + ) -> Dict[str, Any]: + """Gets the DAG output parameter values from a DagOutputsSpec and the + DAG's IOStore. + + Args: + dag_outputs_spec: DagOutputsSpec corresponding to the DAG. + io_store: IOStore corresponding to the DAG. + + Returns: + The DAG output parameters. + """ + outputs = {} + for root_output_key, parameter_selector_spec in dag_outputs_spec.parameters.items( + ): + kind = parameter_selector_spec.WhichOneof('kind') + if kind == 'value_from_parameter': + value_from_parameter = parameter_selector_spec.value_from_parameter + outputs[root_output_key] = io_store.get_task_output( + value_from_parameter.producer_subtask, + value_from_parameter.output_parameter_key, + ) + elif kind == 'value_from_oneof': + raise NotImplementedError( + "'dsl.OneOf' is not yet supported in local execution.") + else: + raise ValueError( + f"Got unknown 'parameter_selector_spec' kind: {kind}") + return outputs + + @classmethod + def get_dag_output_artifacts( + cls, + dag_outputs_spec: pipeline_spec_pb2.DagOutputsSpec, + io_store: io.IOStore, + ) -> Dict[str, Any]: + """Gets the DAG output artifact values from a DagOutputsSpec and the + DAG's IOStore. + + Args: + dag_outputs_spec: DagOutputsSpec corresponding to the DAG. + io_store: IOStore corresponding to the DAG. + + Returns: + The DAG output artifacts. + """ + outputs = {} + for root_output_key, artifact_selector_spec in dag_outputs_spec.artifacts.items( + ): + len_artifact_selectors = len( + artifact_selector_spec.artifact_selectors) + if len_artifact_selectors != 1: + raise ValueError( + f'Expected 1 artifact in ArtifactSelectorSpec. Got: {len_artifact_selectors}' + ) + artifact_selector = artifact_selector_spec.artifact_selectors[0] + outputs[root_output_key] = io_store.get_task_output( + artifact_selector.producer_subtask, + artifact_selector.output_artifact_key, + ) + return outputs + + @classmethod + def get_dag_outputs( + cls, + dag_outputs_spec: pipeline_spec_pb2.DagOutputsSpec, + io_store: io.IOStore, + ) -> Dict[str, Any]: + """Gets the DAG output values from a DagOutputsSpec and the DAG's + IOStore. + + Args: + dag_outputs_spec: DagOutputsSpec corresponding to the DAG. + io_store: IOStore corresponding to the DAG. + + Returns: + The DAG outputs. + """ + output_params = cls.get_dag_output_parameters( + dag_outputs_spec=dag_outputs_spec, + io_store=io_store, + ) + output_artifacts = cls.get_dag_output_artifacts( + dag_outputs_spec=dag_outputs_spec, + io_store=io_store, + ) + return {**output_params, **output_artifacts} diff --git a/sdk/python/kfp/local/pipeline_orchestrator.py b/sdk/python/kfp/local/pipeline_orchestrator.py index 72bfc406af8..1123054947c 100644 --- a/sdk/python/kfp/local/pipeline_orchestrator.py +++ b/sdk/python/kfp/local/pipeline_orchestrator.py @@ -18,7 +18,6 @@ from typing import Any, Dict, List from kfp.local import config -from kfp.local import dag_orchestrator from kfp.local import logging_utils from kfp.local import placeholder_utils from kfp.local import status @@ -89,17 +88,38 @@ def _run_local_pipeline_implementation( components = dict(pipeline_spec.components.items()) fail_stack: List[str] = [] try: - outputs, dag_status = dag_orchestrator.run_dag( - pipeline_resource_name=pipeline_resource_name, - dag_component_spec=pipeline_spec.root, - executors=executors, - components=components, - dag_arguments=arguments, - pipeline_root=pipeline_root, - runner=runner, - unique_pipeline_id=placeholder_utils.make_random_id(), - fail_stack=fail_stack, - ) + dag_component_spec = pipeline_spec.root + dag_spec = dag_component_spec.dag + has_control_flow = any( + task_spec.trigger_policy.condition or + task_spec.WhichOneof('iterator') + for task_spec in dag_spec.tasks.values()) + if has_control_flow: + from kfp.local.orchestrator import enhanced_dag_orchestrator + outputs, dag_status = enhanced_dag_orchestrator.run_enhanced_dag( + pipeline_resource_name=pipeline_resource_name, + dag_component_spec=dag_component_spec, + executors=executors, + components=components, + dag_arguments=arguments, + pipeline_root=pipeline_root, + runner=runner, + unique_pipeline_id=placeholder_utils.make_random_id(), + fail_stack=fail_stack, + ) + else: + from kfp.local.orchestrator import dag_orchestrator + outputs, dag_status = dag_orchestrator.run_dag( + pipeline_resource_name=pipeline_resource_name, + dag_component_spec=dag_component_spec, + executors=executors, + components=components, + dag_arguments=arguments, + pipeline_root=pipeline_root, + runner=runner, + unique_pipeline_id=placeholder_utils.make_random_id(), + fail_stack=fail_stack, + ) if dag_status == status.Status.SUCCESS: status_with_color = logging_utils.format_status( status.Status.SUCCESS) diff --git a/sdk/python/kfp/local/pipeline_orchestrator_test.py b/sdk/python/kfp/local/pipeline_orchestrator_test.py index de710f9799b..2330c2ccca6 100644 --- a/sdk/python/kfp/local/pipeline_orchestrator_test.py +++ b/sdk/python/kfp/local/pipeline_orchestrator_test.py @@ -560,10 +560,19 @@ def pythagorean_then_add( self.assertAlmostEqual(task.output, 44.745992817228334) self.assert_output_dir_contents(1, 7) - def test_parallel_for_not_supported(self): - local.init(local.SubprocessRunner()) + def test_parallel_for_supported(self): + # Use use_venv=False to avoid pip race conditions when installing + # packages in parallel virtual environments + local.init( + local.SubprocessRunner(use_venv=False), + pipeline_root=ROOT_FOR_TESTING) - @dsl.component + # Use the original dsl.component to avoid the fixture's packages_to_install + # which causes pip race conditions when running in parallel + from kfp.dsl import component_decorator + original_component = component_decorator.component + + @original_component def pass_op(): pass @@ -572,14 +581,14 @@ def my_pipeline(): with dsl.ParallelFor([1, 2, 3]): pass_op() - with self.assertRaisesRegex( - NotImplementedError, - r"'dsl\.ParallelFor' is not supported by local pipeline execution\." - ): - my_pipeline() + # Should now execute successfully with enhanced orchestrator + task = my_pipeline() + self.assertIsInstance(task, pipeline_task.PipelineTask) + # ParallelFor detection should route to enhanced orchestrator + # and execute tasks in parallel - def test_condition_not_supported(self): - local.init(local.SubprocessRunner()) + def test_condition_supported(self): + local.init(local.SubprocessRunner(), pipeline_root=ROOT_FOR_TESTING) @dsl.component def pass_op(): @@ -590,11 +599,11 @@ def my_pipeline(x: str): with dsl.Condition(x == 'foo'): pass_op() - with self.assertRaisesRegex( - NotImplementedError, - r"'dsl\.Condition' is not supported by local pipeline execution\." - ): - my_pipeline(x='bar') + # Should now execute successfully with enhanced orchestrator + task = my_pipeline(x='bar') + self.assertIsInstance(task, pipeline_task.PipelineTask) + # Condition detection should route to enhanced orchestrator + # and evaluate the condition (false in this case, so task is skipped) @mock.patch('sys.stdout', new_callable=stdlib_io.StringIO) def test_fails_with_raise_on_error_true(self, mock_stdout): diff --git a/sdk/python/test/local_execution/local_execution_test.py b/sdk/python/test/local_execution/local_execution_test.py index ceaa2c4e550..554509de2cf 100644 --- a/sdk/python/test/local_execution/local_execution_test.py +++ b/sdk/python/test/local_execution/local_execution_test.py @@ -28,7 +28,6 @@ dsl.component = functools.partial( dsl.component, base_image=base_image, kfp_package_path=_KFP_PACKAGE_PATH) - from test_data.sdk_compiled_pipelines.valid.arguments_parameters import \ echo as arguments_echo from test_data.sdk_compiled_pipelines.valid.critical.add_numbers import \ @@ -51,6 +50,8 @@ container_no_input from test_data.sdk_compiled_pipelines.valid.essential.lightweight_python_functions_with_outputs import \ pipeline as lightweight_with_outputs_pipeline +from test_data.sdk_compiled_pipelines.valid.essential.pipeline_with_loops import \ + my_pipeline as pipeline_with_loops from test_data.sdk_compiled_pipelines.valid.hello_world import echo from test_data.sdk_compiled_pipelines.valid.identity import identity from test_data.sdk_compiled_pipelines.valid.nested_return import nested_return @@ -58,6 +59,10 @@ output_metrics from test_data.sdk_compiled_pipelines.valid.parameter import \ crust as parameter_pipeline +from test_data.sdk_compiled_pipelines.valid.pipeline_with_parallelfor_list_artifacts import \ + my_pipeline as pipeline_with_parallelfor_list_artifacts +from test_data.sdk_compiled_pipelines.valid.pipeline_with_parallelfor_parallelism import \ + my_pipeline as pipeline_with_parallelfor_parallelism from test_data.sdk_compiled_pipelines.valid.sequential_v1 import sequential @@ -184,6 +189,12 @@ def idfn(val): pipeline_func_args=None, expected_output=None, ), + TestData( + name='Pipeline with Loops', + pipeline_func=pipeline_with_loops, + pipeline_func_args={'loop_parameter': ['item1', 'item2', 'item3']}, + expected_output=None, + ), ] docker_specific_pipeline_funcs = [ @@ -223,6 +234,18 @@ def idfn(val): }, expected_output=None, ), + TestData( + name='Pipeline with ParallelFor Parallelism', + pipeline_func=pipeline_with_parallelfor_parallelism, + pipeline_func_args={'loop_parameter': ['item1', 'item2', 'item3']}, + expected_output=None, + ), + TestData( + name='Pipeline with ParallelFor List Artifacts', + pipeline_func=pipeline_with_parallelfor_list_artifacts, + pipeline_func_args=None, + expected_output=None, + ), ] docker_specific_pipeline_funcs.extend(pipeline_func_data) @@ -234,8 +257,17 @@ class TestDockerRunner: def setup_and_teardown(self): ws_root = f'{ws_root_base}_docker' pipeline_root = f'{pipeline_root_base}_docker' + + # Create directories with proper permissions Path(ws_root).mkdir(exist_ok=True) Path(pipeline_root).mkdir(exist_ok=True) + + # Set permissions to be writable by all (needed for Docker) + import stat + os.chmod(ws_root, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + os.chmod(pipeline_root, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + + # Configure DockerRunner - let container run as default user local.init( runner=local.DockerRunner(), raise_on_error=True, diff --git a/test_data/sdk_compiled_pipelines/valid/pipeline_with_parallelfor_parallelism.py b/test_data/sdk_compiled_pipelines/valid/pipeline_with_parallelfor_parallelism.py index f477767dd64..a612bc8395a 100644 --- a/test_data/sdk_compiled_pipelines/valid/pipeline_with_parallelfor_parallelism.py +++ b/test_data/sdk_compiled_pipelines/valid/pipeline_with_parallelfor_parallelism.py @@ -79,7 +79,7 @@ def list_dict_maker_3() -> List: pipeline_package_path) -@dsl.pipeline(name='pipeline-with-loops') +@dsl.pipeline(name='pipeline-with-parallelfor-parallelism') def my_pipeline(loop_parameter: List[str]): # Loop argument is from a pipeline input