Skip to content

Commit 1e2c9d2

Browse files
committed
Finalize pending status.
1 parent fde5668 commit 1e2c9d2

File tree

3 files changed

+9
-6
lines changed

3 files changed

+9
-6
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ dependencies = [
1515
"cloudpickle",
1616
"loky",
1717
"pluggy>=1.0.0",
18-
"pytask>=0.5.2",
18+
"git+https://github.com/pytask-dev/pytask@allow-setting-task-status",
1919
"rich",
2020
]
2121
dynamic = ["version"]

src/pytask_parallel/execute.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,14 +100,15 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091
100100
# Unfortunately, all submitted tasks are shown as running although some
101101
# are pending.
102102
#
103-
# Without coiled functions, we submit as many tasks as there are
104-
# available workers since we cannot reliably detect a pending status.
103+
# For all other backends, at least four more tasks are submitted and
104+
# otherwise 10% more. This is a heuristic to avoid submitting too few
105+
# tasks.
105106
#
106107
# See #98 for more information.
107108
if any_coiled_task:
108109
n_new_tasks = 10_000
109110
else:
110-
n_new_tasks = session.config["n_workers"] - len(running_tasks)
111+
n_new_tasks = max(4, int(session.config["n_workers"] * 0.1))
111112

112113
ready_tasks = (
113114
list(session.scheduler.get_ready(n_new_tasks))
@@ -185,6 +186,8 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091
185186
newly_collected_reports.append(report)
186187
session.scheduler.done(task_signature)
187188

189+
# Check if tasks are not pending but running and update the live
190+
# status.
188191
elif live_execution and "_shared_memory" in session.config:
189192
if task_signature in session.config["_shared_memory"]:
190193
live_execution.update_task(

src/pytask_parallel/wrappers.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ def wrap_task_in_thread(
8181

8282
# Remove task from shared memory to indicate that it is no longer being executed.
8383
if shared_memory is not None:
84-
shared_memory.pop(task.signature)
84+
shared_memory.pop(task.signature, None)
8585

8686
return WrapperResult(
8787
carry_over_products=None, # type: ignore[arg-type]
@@ -177,7 +177,7 @@ def wrap_task_in_process( # noqa: PLR0913
177177

178178
# Remove task from shared memory to indicate that it is no longer being executed.
179179
if shared_memory is not None:
180-
shared_memory.pop(task.signature)
180+
shared_memory.pop(task.signature, None)
181181

182182
return WrapperResult(
183183
carry_over_products=products, # type: ignore[arg-type]

0 commit comments

Comments
 (0)