Skip to content

Commit 5c8a716

Browse files
authored
Make sleeping between scheduling dynamic. (#46)
1 parent 769b622 commit 5c8a716

10 files changed

+62
-116
lines changed

.pre-commit-config.yaml

+4-4
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ repos:
3131
rev: v2.0.0
3232
hooks:
3333
- id: setup-cfg-fmt
34-
- repo: https://github.com/myint/docformatter
35-
rev: v1.5.0-rc1
34+
- repo: https://github.com/PYCQA/docformatter
35+
rev: v1.5.0
3636
hooks:
3737
- id: docformatter
3838
args: [--in-place, --wrap-summaries, "88", --wrap-descriptions, "88", --blank]
@@ -67,7 +67,7 @@ repos:
6767
- id: interrogate
6868
args: [-v, --fail-under=40, src, tests]
6969
- repo: https://github.com/executablebooks/mdformat
70-
rev: 0.7.14
70+
rev: 0.7.15
7171
hooks:
7272
- id: mdformat
7373
additional_dependencies: [
@@ -76,7 +76,7 @@ repos:
7676
]
7777
args: [--wrap, "88"]
7878
- repo: https://github.com/codespell-project/codespell
79-
rev: v2.1.0
79+
rev: v2.2.1
8080
hooks:
8181
- id: codespell
8282
- repo: https://github.com/pre-commit/mirrors-mypy

CHANGES.md

+4-1
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,13 @@ chronological order. Releases follow [semantic versioning](https://semver.org/)
55
releases are available on [PyPI](https://pypi.org/project/pytask-parallel) and
66
[Anaconda.org](https://anaconda.org/conda-forge/pytask-parallel).
77

8-
## 0.2.1 - 2022-08-xx
8+
## 0.2.1 - 2022-08-19
99

1010
- {pull}`43` adds docformatter.
1111
- {pull}`44` allows to capture warnings from subprocesses. Fixes {issue}`41`.
12+
- {pull}`45` replaces the delay command line option with an internal, dynamic parameter.
13+
Fixes {issue}`41`.
14+
- {pull}`46` adds a dynamic sleep duration during the execution. Fixes {issue}`42`.
1215

1316
## 0.2.0 - 2022-04-15
1417

src/pytask_parallel/build.py

-9
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,5 @@ def pytask_extend_command_line_interface(cli: click.Group) -> None:
2929
),
3030
default=None,
3131
),
32-
click.Option(
33-
["--delay"],
34-
help=(
35-
"Delay between checking whether tasks have finished. [default: 0.1 "
36-
"(seconds)]"
37-
),
38-
metavar="NUMBER > 0",
39-
default=None,
40-
),
4132
]
4233
cli.commands["build"].params.extend(additional_parameters)

src/pytask_parallel/callbacks.py

-16
Original file line numberDiff line numberDiff line change
@@ -33,19 +33,3 @@ def parallel_backend_callback(value: Any) -> str | None:
3333
f"parallel_backend has to be one of {list(PARALLEL_BACKENDS)}."
3434
)
3535
return value
36-
37-
38-
def delay_callback(value: Any) -> float | None:
39-
"""Validate the delay option."""
40-
if value in [None, "None", "none"]:
41-
value = None
42-
else:
43-
try:
44-
value = float(value)
45-
except ValueError:
46-
pass
47-
48-
if not (isinstance(value, float) and value > 0):
49-
raise ValueError("delay has to be a number greater than 0.")
50-
51-
return value

src/pytask_parallel/config.py

+1-8
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
from pytask import hookimpl
99
from pytask_parallel.backends import PARALLEL_BACKENDS_DEFAULT
10-
from pytask_parallel.callbacks import delay_callback
1110
from pytask_parallel.callbacks import n_workers_callback
1211
from pytask_parallel.callbacks import parallel_backend_callback
1312

@@ -29,13 +28,7 @@ def pytask_parse_config(
2928
if config["n_workers"] == "auto":
3029
config["n_workers"] = max(os.cpu_count() - 1, 1)
3130

32-
config["delay"] = _get_first_non_none_value(
33-
config_from_cli,
34-
config_from_file,
35-
key="delay",
36-
default=0.1,
37-
callback=delay_callback,
38-
)
31+
config["delay"] = 0.1
3932

4033
config["parallel_backend"] = _get_first_non_none_value(
4134
config_from_cli,

src/pytask_parallel/execute.py

+31-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@
99
from types import TracebackType
1010
from typing import Any
1111
from typing import Callable
12+
from typing import List
1213

14+
import attr
1315
import cloudpickle
1416
from pybaum.tree_util import tree_map
1517
from pytask import console
@@ -65,6 +67,7 @@ def pytask_execute_build(session: Session) -> bool | None:
6567
with parallel_backend(max_workers=session.config["n_workers"]) as executor:
6668

6769
session.executor = executor
70+
sleeper = _Sleeper()
6871

6972
while session.scheduler.is_active():
7073

@@ -96,6 +99,10 @@ def pytask_execute_build(session: Session) -> bool | None:
9699
running_tasks[task_name] = session.hook.pytask_execute_task(
97100
session=session, task=task
98101
)
102+
sleeper.reset()
103+
104+
if not ready_tasks:
105+
sleeper.increment()
99106

100107
for task_name in list(running_tasks):
101108
future = running_tasks[task_name]
@@ -146,7 +153,7 @@ def pytask_execute_build(session: Session) -> bool | None:
146153
if session.should_stop:
147154
break
148155
else:
149-
time.sleep(session.config["delay"])
156+
sleeper.sleep()
150157
except KeyboardInterrupt:
151158
break
152159

@@ -316,3 +323,26 @@ def _create_kwargs_for_task(task: Task) -> dict[Any, Any]:
316323
kwargs[arg_name] = tree_map(lambda x: x.value, attribute)
317324

318325
return kwargs
326+
327+
328+
@attr.s(kw_only=True)
329+
class _Sleeper:
330+
"""A sleeper that always sleeps a bit and up to 1 second if you don't wake it up.
331+
332+
This class controls when the next iteration of the execution loop starts. If new
333+
tasks are scheduled, the time spent sleeping is reset to a lower value.
334+
335+
"""
336+
337+
timings = attr.ib(type=List[float], default=[(i / 10) ** 2 for i in range(1, 11)])
338+
timing_idx = attr.ib(type=int, default=0)
339+
340+
def reset(self) -> None:
341+
self.timing_idx = 0
342+
343+
def increment(self) -> None:
344+
if self.timing_idx < len(self.timings) - 1:
345+
self.timing_idx += 1
346+
347+
def sleep(self) -> None:
348+
time.sleep(self.timings[self.timing_idx])

tests/test_callbacks.py

-19
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
import pytest
66
from pytask_parallel.backends import PARALLEL_BACKENDS
7-
from pytask_parallel.callbacks import delay_callback
87
from pytask_parallel.callbacks import n_workers_callback
98
from pytask_parallel.callbacks import parallel_backend_callback
109

@@ -45,21 +44,3 @@ def test_n_workers_callback(value, expectation):
4544
def test_parallel_backend_callback(value, expectation):
4645
with expectation:
4746
parallel_backend_callback(value)
48-
49-
50-
@pytest.mark.unit
51-
@pytest.mark.parametrize(
52-
"value, expectation",
53-
[
54-
(-1, pytest.raises(ValueError)),
55-
(0.1, does_not_raise()),
56-
(1, does_not_raise()),
57-
("asdad", pytest.raises(ValueError)),
58-
(None, does_not_raise()),
59-
("None", does_not_raise()),
60-
("none", does_not_raise()),
61-
],
62-
)
63-
def test_delay_callback(value, expectation):
64-
with expectation:
65-
delay_callback(value)

tests/test_cli.py

-27
This file was deleted.

tests/test_config.py

-2
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,6 @@ def test_interplay_between_debugging_and_parallel(tmp_path, pdb, n_workers, expe
3535
("n_workers", "auto", ExitCode.OK),
3636
("n_workers", 1, ExitCode.OK),
3737
("n_workers", 2, ExitCode.OK),
38-
("delay", 0.1, ExitCode.OK),
39-
("delay", 1, ExitCode.OK),
4038
("parallel_backend", "unknown_backend", ExitCode.CONFIGURATION_FAILED),
4139
]
4240
+ [

tests/test_execute.py

+22-29
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from pytask import main
1212
from pytask import Task
1313
from pytask_parallel.backends import PARALLEL_BACKENDS
14+
from pytask_parallel.execute import _Sleeper
1415
from pytask_parallel.execute import DefaultBackendNameSpace
1516
from pytask_parallel.execute import ProcessesNameSpace
1617

@@ -141,35 +142,6 @@ def myfunc():
141142
assert exception is None
142143

143144

144-
@pytest.mark.end_to_end
145-
@pytest.mark.parametrize("parallel_backend", PARALLEL_BACKENDS)
146-
def test_parallel_execution_delay(tmp_path, parallel_backend):
147-
source = """
148-
import pytask
149-
150-
@pytask.mark.produces("out_1.txt")
151-
def task_1(produces):
152-
produces.write_text("1")
153-
154-
@pytask.mark.produces("out_2.txt")
155-
def task_2(produces):
156-
produces.write_text("2")
157-
"""
158-
tmp_path.joinpath("task_dummy.py").write_text(textwrap.dedent(source))
159-
160-
session = main(
161-
{
162-
"paths": tmp_path,
163-
"delay": 3,
164-
"n_workers": 2,
165-
"parallel_backend": parallel_backend,
166-
}
167-
)
168-
169-
assert session.exit_code == ExitCode.OK
170-
assert 3 < session.execution_end - session.execution_start < 10
171-
172-
173145
@pytest.mark.end_to_end
174146
@pytest.mark.parametrize("parallel_backend", PARALLEL_BACKENDS)
175147
def test_stop_execution_when_max_failures_is_reached(tmp_path, parallel_backend):
@@ -325,3 +297,24 @@ def task_example(produces):
325297
warnings_block = result.output.split("Warnings")[1]
326298
assert "task_example.py::task_example[0]" in warnings_block
327299
assert "task_example.py::task_example[1]" in warnings_block
300+
301+
302+
def test_sleeper():
303+
sleeper = _Sleeper(timings=[1, 2, 3], timing_idx=0)
304+
305+
assert sleeper.timings == [1, 2, 3]
306+
assert sleeper.timing_idx == 0
307+
308+
sleeper.increment()
309+
assert sleeper.timing_idx == 1
310+
311+
sleeper.increment()
312+
assert sleeper.timing_idx == 2
313+
314+
sleeper.reset()
315+
assert sleeper.timing_idx == 0
316+
317+
start = time()
318+
sleeper.sleep()
319+
end = time()
320+
assert 1 <= end - start <= 2

0 commit comments

Comments
 (0)