diff --git a/docs/source/changes.md b/docs/source/changes.md index bcaa1a6..163995d 100644 --- a/docs/source/changes.md +++ b/docs/source/changes.md @@ -25,6 +25,8 @@ releases are available on [PyPI](https://pypi.org/project/pytask-parallel) and - {pull}`100` adds project management with rye. - {pull}`101` adds syncing for local paths as dependencies or products in remote environments with the same OS. +- {pull}`106` fixes {pull}`99` such that only when there are coiled functions, all ready + tasks are submitted. ## 0.4.1 - 2024-01-12 diff --git a/src/pytask_parallel/execute.py b/src/pytask_parallel/execute.py index 1ee0d71..c370c25 100644 --- a/src/pytask_parallel/execute.py +++ b/src/pytask_parallel/execute.py @@ -16,6 +16,7 @@ from pytask import PTask from pytask import PythonNode from pytask import Session +from pytask import TaskExecutionStatus from pytask import console from pytask import get_marks from pytask import hookimpl @@ -52,6 +53,7 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 __tracebackhide__ = True reports = session.execution_reports running_tasks: dict[str, Future[Any]] = {} + any_coiled_task = any(is_coiled_function(task) for task in session.tasks) # The executor can only be created after the collection to give users the # possibility to inject their own executors. @@ -66,12 +68,31 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 while session.scheduler.is_active(): try: newly_collected_reports = [] - ready_tasks = list(session.scheduler.get_ready(10_000)) + + # If there is any coiled function, the user probably wants to exploit + # adaptive scaling. Thus, we need to submit all ready tasks. + # Unfortunately, all submitted tasks are shown as running although some + # are pending. + # + # Without coiled functions, we submit as many tasks as there are + # available workers since we cannot reliably detect a pending status. + # + # See #98 for more information. + if any_coiled_task: + n_new_tasks = 10_000 + else: + n_new_tasks = session.config["n_workers"] - len(running_tasks) + + ready_tasks = ( + list(session.scheduler.get_ready(n_new_tasks)) + if n_new_tasks >= 1 + else [] + ) for task_name in ready_tasks: task = session.dag.nodes[task_name]["task"] session.hook.pytask_execute_task_log_start( - session=session, task=task + session=session, task=task, status=TaskExecutionStatus.RUNNING ) try: session.hook.pytask_execute_task_setup( diff --git a/tests/test_execute.py b/tests/test_execute.py index ac8a58c..31940f3 100644 --- a/tests/test_execute.py +++ b/tests/test_execute.py @@ -78,7 +78,6 @@ def task_2(path: Annotated[Path, Product] = Path("out_2.txt")): @pytest.mark.end_to_end() -@pytest.mark.skip(reason="See #98") @pytest.mark.parametrize("parallel_backend", _IMPLEMENTED_BACKENDS) def test_stop_execution_when_max_failures_is_reached(tmp_path, parallel_backend): source = """ @@ -107,7 +106,6 @@ def task_3(): time.sleep(3) @pytest.mark.end_to_end() -@pytest.mark.skip(reason="See #98") @pytest.mark.parametrize("parallel_backend", _IMPLEMENTED_BACKENDS) def test_task_priorities(tmp_path, parallel_backend): source = """