Skip to content

Commit 0b0cd52

Browse files
authored
Schedule only all ready tasks when there are coiled functions. (#106)
1 parent aedb942 commit 0b0cd52

File tree

3 files changed

+25
-4
lines changed

3 files changed

+25
-4
lines changed

docs/source/changes.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ releases are available on [PyPI](https://pypi.org/project/pytask-parallel) and
2525
- {pull}`100` adds project management with rye.
2626
- {pull}`101` adds syncing for local paths as dependencies or products in remote
2727
environments with the same OS.
28+
- {pull}`106` fixes {pull}`99` such that only when there are coiled functions, all ready
29+
tasks are submitted.
2830

2931
## 0.4.1 - 2024-01-12
3032

src/pytask_parallel/execute.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from pytask import PTask
1717
from pytask import PythonNode
1818
from pytask import Session
19+
from pytask import TaskExecutionStatus
1920
from pytask import console
2021
from pytask import get_marks
2122
from pytask import hookimpl
@@ -52,6 +53,7 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091
5253
__tracebackhide__ = True
5354
reports = session.execution_reports
5455
running_tasks: dict[str, Future[Any]] = {}
56+
any_coiled_task = any(is_coiled_function(task) for task in session.tasks)
5557

5658
# The executor can only be created after the collection to give users the
5759
# possibility to inject their own executors.
@@ -66,12 +68,31 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091
6668
while session.scheduler.is_active():
6769
try:
6870
newly_collected_reports = []
69-
ready_tasks = list(session.scheduler.get_ready(10_000))
71+
72+
# If there is any coiled function, the user probably wants to exploit
73+
# adaptive scaling. Thus, we need to submit all ready tasks.
74+
# Unfortunately, all submitted tasks are shown as running although some
75+
# are pending.
76+
#
77+
# Without coiled functions, we submit as many tasks as there are
78+
# available workers since we cannot reliably detect a pending status.
79+
#
80+
# See #98 for more information.
81+
if any_coiled_task:
82+
n_new_tasks = 10_000
83+
else:
84+
n_new_tasks = session.config["n_workers"] - len(running_tasks)
85+
86+
ready_tasks = (
87+
list(session.scheduler.get_ready(n_new_tasks))
88+
if n_new_tasks >= 1
89+
else []
90+
)
7091

7192
for task_name in ready_tasks:
7293
task = session.dag.nodes[task_name]["task"]
7394
session.hook.pytask_execute_task_log_start(
74-
session=session, task=task
95+
session=session, task=task, status=TaskExecutionStatus.RUNNING
7596
)
7697
try:
7798
session.hook.pytask_execute_task_setup(

tests/test_execute.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@ def task_2(path: Annotated[Path, Product] = Path("out_2.txt")):
7878

7979

8080
@pytest.mark.end_to_end()
81-
@pytest.mark.skip(reason="See #98")
8281
@pytest.mark.parametrize("parallel_backend", _IMPLEMENTED_BACKENDS)
8382
def test_stop_execution_when_max_failures_is_reached(tmp_path, parallel_backend):
8483
source = """
@@ -107,7 +106,6 @@ def task_3(): time.sleep(3)
107106

108107

109108
@pytest.mark.end_to_end()
110-
@pytest.mark.skip(reason="See #98")
111109
@pytest.mark.parametrize("parallel_backend", _IMPLEMENTED_BACKENDS)
112110
def test_task_priorities(tmp_path, parallel_backend):
113111
source = """

0 commit comments

Comments
 (0)