Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sdk] Error compiling with ParallelFor and PipelineParam #10592

Open
jsilva opened this issue Mar 20, 2024 · 12 comments
Open

[sdk] Error compiling with ParallelFor and PipelineParam #10592

jsilva opened this issue Mar 20, 2024 · 12 comments

Comments

@jsilva
Copy link

jsilva commented Mar 20, 2024

Environment

  • KFP version: N/A - it does not compile

  • KFP SDK version: kfp 2.7.0

  • All dependencies version:

pip list | grep kfp

kfp                                      2.7.0
kfp-pipeline-spec             0.3.0
kfp-server-api                   2.0.5

Steps to reproduce

Using the following script IO get an unexpected error

from kfp.v2 import dsl, compiler

@dsl.component()
def print_op(message: str, shard: int) -> None:
    print(f"Shard: {shard}")
    print(f"Message: {message}")


@dsl.pipeline()
def my_pipeline(
        processing_date: str = "2021-01-01",
):
    shards = [1, 2, 3]
    with dsl.ParallelFor(shards, parallelism=len(shards)) as shard:
        one = print_op(message=processing_date, shard=shard)

    two = print_op(message=processing_date, shard=1).after(one)


compiler.Compiler().compile(
    pipeline_func=my_pipeline, package_path="pipeline.yaml"
)

or adding the following test

    def test_valid_parallelfor_5(self):

        @dsl.pipeline
        def my_pipeline(test_arg: str = 'foo'):
            with dsl.ParallelFor([1, 2, 3]):
                one = print_and_return(text=test_arg)

            # refers to all instances of one
            two = print_and_return(text=test_arg).after(one)

        self.assertTrue(my_pipeline.pipeline_spec)

to the python sdk compiler_test.py will do the trick.

With the above I get the following error:

          
/Users/xxxxxxxx/work/mlops/wanna/wanna-customers/wanna-kraken/quick_kfp.py:1: DeprecationWarning: The module `kfp.v2` is deprecated and will be removed in a futureversion. Please import directly from the `kfp` namespace, instead of `kfp.v2`.

  from kfp.v2 import dsl, compiler
/Users/xxxxxxxx/Library/Caches/pypoetry/virtualenvs/wanna-kraken-8nJaTdMD-py3.10/lib/python3.10/site-packages/kfp/dsl/component_decorator.py:119: FutureWarning: Python 3.7 has reached end-of-life. The default base_image used by the @dsl.component decorator will switch from 'python:3.7' to 'python:3.8' on April 23, 2024. To ensure your existing components work with versions of the KFP SDK released after that date, you should provide an explicit base_image argument and ensure your component works as intended on Python 3.8.
  return component_factory.create_component_from_func(
Traceback (most recent call last):
  File "/Users/xxxxxxxx/work/mlops/wanna/wanna-customers/wanna-kraken/quick_kfp.py", line 10, in <module>
    def my_pipeline(
  File "/Users/xxxxxxxx/Library/Caches/pypoetry/virtualenvs/wanna-kraken-8nJaTdMD-py3.10/lib/python3.10/site-packages/kfp/dsl/pipeline_context.py", line 65, in pipeline
    return component_factory.create_graph_component_from_func(
  File "/Users/xxxxxxxx/Library/Caches/pypoetry/virtualenvs/wanna-kraken-8nJaTdMD-py3.10/lib/python3.10/site-packages/kfp/dsl/component_factory.py", line 673, in create_graph_component_from_func
    return graph_component.GraphComponent(
  File "/Users/xxxxxxxx/Library/Caches/pypoetry/virtualenvs/wanna-kraken-8nJaTdMD-py3.10/lib/python3.10/site-packages/kfp/dsl/graph_component.py", line 68, in __init__
    pipeline_spec, platform_spec = builder.create_pipeline_spec(
  File "/Users/xxxxxxxx/Library/Caches/pypoetry/virtualenvs/wanna-kraken-8nJaTdMD-py3.10/lib/python3.10/site-packages/kfp/compiler/pipeline_spec_builder.py", line 1909, in create_pipeline_spec
    dependencies = compiler_utils.get_dependencies(
  File "/Users/xxxxxxxx/Library/Caches/pypoetry/virtualenvs/wanna-kraken-8nJaTdMD-py3.10/lib/python3.10/site-packages/kfp/compiler/compiler_utils.py", line 762, in get_dependencies
    upstream_tasks_that_downstream_consumers_from = [
  File "/Users/xxxxxxxx/Library/Caches/pypoetry/virtualenvs/wanna-kraken-8nJaTdMD-py3.10/lib/python3.10/site-packages/kfp/compiler/compiler_utils.py", line 763, in <listcomp>
    channel.task.name for channel in task._channel_inputs
    
AttributeError: 'NoneType' object has no attribute 'name'

Expected result

  1. pipeline.yaml available on disk ready for JobSubmit

Materials and Reference

This issue is related to changes in #10257
Doing a check if channel.task in https://github.com/kubeflow/pipelines/blob/master/sdk/python/kfp/compiler/compiler_utils.py#L763 will make the above test pass.


Impacted by this bug? Give it a 👍.

Copy link

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the lifecycle/stale The issue / pull request is stale, any activities remove this label. label May 20, 2024
@HumairAK
Copy link
Collaborator

this looks like it's still relevant

@stale stale bot removed the lifecycle/stale The issue / pull request is stale, any activities remove this label. label May 22, 2024
@jsilva
Copy link
Author

jsilva commented Jun 20, 2024

This bug is still present in SDK 2.2.0.

@agkphysics
Copy link

Since presumably only tasks are needed to be checked here, pipeline parameters can probably be ignored:

upstream_tasks_that_downstream_consumers_from = [
channel.task.name for channel in task._channel_inputs
]
has_data_exchange = upstream_task.name in upstream_tasks_that_downstream_consumers_from

Like so:

upstream_tasks_that_downstream_consumers_from = [
    channel.task.name for channel in task._channel_inputs if channel.task is not None
]

@benbs
Copy link

benbs commented Sep 1, 2024

Any news about this? or is there any known workaround to make it work (other then removing the PipelineParameterChannel arguments)?

@jackward-countdown
Copy link

Also looking for info on this. The only way I know to work around it is to have the components in the loop have an output and collect them, which has the side effect of limiting parallelism to 100.

@mai-nakagawa
Copy link
Contributor

mai-nakagawa commented Sep 27, 2024

Workaround suggestion.

Seems the problem happens when pipeline parameters are used by a task next to ParallelFor. In the following example, the pipeline parameter param used by print_op next to ParallelFor. The code fails with kfp v2.9.0.

from kfp import compiler, dsl
from typing import List


@dsl.component
def args_generator_op() -> List[str]:
    return ["1", "2", "3"]


@dsl.component
def print_op(s: str):
    print(s)


@dsl.pipeline(name='pipeline-with-loop-output-v2')
def my_pipeline(param: str):
    task1 = args_generator_op()
    with dsl.ParallelFor(task1.output) as item:
        task2 = print_op(s=item).after(task1)
    print_op(s=param).after(task2)


if __name__ == '__main__':
    compiler.Compiler().compile(my_pipeline, __file__ + '.yaml')

So the possible workaround is 1) to surround the pipeline function by another function and 2) to pass parameters to the outer function instead. In the following example, parameter param is passed to the outer function generate_pipeline:

from kfp import compiler, dsl
from typing import List


@dsl.component
def args_generator_op() -> List[str]:
    return ["1", "2", "3"]


@dsl.component
def print_op(s: str):
    print(s)


def generate_pipeline(param: str):

    @dsl.pipeline(name='pipeline-with-loop-output-v2')
    def my_pipeline():
        task1 = args_generator_op()
        with dsl.ParallelFor(task1.output) as item:
            task2 = print_op(s=item).after(task1)
        print_op(s=param).after(task2)

    return my_pipeline

if __name__ == '__main__':
    my_pipeline = generate_pipeline(param="foo")
    compiler.Compiler().compile(my_pipeline, __file__ + '.yaml')

It works in my situation.

Copy link

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the lifecycle/stale The issue / pull request is stale, any activities remove this label. label Nov 26, 2024
@stale stale bot removed the lifecycle/stale The issue / pull request is stale, any activities remove this label. label Nov 27, 2024
@HumairAK
Copy link
Collaborator

/lifecycle frozen

@farridav
Copy link

Hitting this same issue with KFP on vertex pipelines, are there any known workarounds?

@racinmat
Copy link

racinmat commented Jan 8, 2025

Hi, any plan to fix this issue?

@farridav
Copy link

farridav commented Jan 8, 2025

I have a fix for this (at least that passes my use case) here #11476

Which appears to be failing on a flaky test, my time to work through those issues is fairly limited, so im hoping it resolves iotself with a retry given how little code ive actually changed

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

8 participants