Skip to content

Commit c8ed5ec

Browse files
authored
Release v0.0.4 and add loky as parallel backend. (#4)
1 parent 0d5fffa commit c8ed5ec

18 files changed

+136
-72
lines changed

Diff for: .conda/meta.yaml

+2-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ requirements:
2626
test:
2727
requires:
2828
- pytest
29+
- loky
2930
source_files:
3031
- tox.ini
3132
- tests
@@ -41,4 +42,4 @@ about:
4142
home: https://github.com/pytask-dev/pytask-parallel
4243
license: none
4344
summary: Parallelize the execution of tasks.
44-
dev_url: https://github.com/pytask-dev/pytask-parallel/
45+
dev_url: https://github.com/pytask-dev/pytask-parallel

Diff for: CHANGES.rst

+8-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,14 @@ Changes
33

44
This is a record of all past pytask-parallel releases and what went into them in reverse
55
chronological order. Releases follow `semantic versioning <https://semver.org/>`_ and
6-
all releases are available on `Anaconda.org <https://anaconda.org/pytask/pytask-parallel>`_.
6+
all releases are available on `Anaconda.org
7+
<https://anaconda.org/pytask/pytask-parallel>`_.
8+
9+
10+
0.0.4 - 2020-10-30
11+
------------------
12+
13+
- :gh:`4` implement an executor with ``loky``.
714

815

916
0.0.3 - 2020-09-12

Diff for: README.rst

+25-9
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,38 @@ pytask-parallel
2121
Parallelize the execution of tasks with `pytask-parallel` which is a plugin for `pytask
2222
<https://github.com/pytask-dev/pytask>`_.
2323

24+
25+
Installation
26+
------------
27+
2428
Install the plugin via ``conda`` with
2529

26-
.. code-block:: bash
30+
.. code-block:: console
2731
2832
$ conda config --add channels conda-forge --add channels pytask
2933
$ conda install pytask-parallel
3034
3135
The plugin uses the ``ProcessPoolExecutor`` or ``ThreadPoolExecutor`` in the
3236
`concurrent.futures <https://docs.python.org/3/library/concurrent.futures.html>`_ module
33-
to execute tasks asynchronously.
37+
to execute tasks asynchronously. By default, processes are used for parallelization.
38+
39+
It is also possible to install ``loky`` with
40+
41+
.. code-block:: console
42+
43+
$ conda install -c conda-forge loky
44+
45+
which is a more robust implementation of the ``ProcessPoolExecutor`` and the default
46+
backend if installed.
47+
48+
49+
Usage
50+
-----
3451

3552
To parallelize your tasks across many workers, pass an integer greater than 1 or
3653
``'auto'`` to the command-line interface.
3754

38-
.. code-block:: bash
55+
.. code-block:: console
3956
4057
$ pytask -n 2
4158
$ pytask --n-workers 2
@@ -44,15 +61,14 @@ To parallelize your tasks across many workers, pass an integer greater than 1 or
4461
$ pytask -n auto
4562
4663
47-
By default, processes are used to parallelize the execution of tasks. This is useful for
48-
CPU bound tasks such as numerical computations. (`Here
49-
<https://stackoverflow.com/a/868577/7523785>`_ is an explanation on what CPU or IO bound
50-
means.)
64+
Using processes to parallelize the execution of tasks is useful for CPU bound tasks such
65+
as numerical computations. (`Here <https://stackoverflow.com/a/868577/7523785>`_ is an
66+
explanation on what CPU or IO bound means.)
5167

5268
For IO bound tasks, tasks where the limiting factor are network responses, accesses to
5369
files, you can parallelize via threads.
5470

55-
.. code-block:: bash
71+
.. code-block:: console
5672
5773
$ pytask --parallel-backend threads
5874
@@ -65,7 +81,7 @@ You can also set the options in one of the configuration files (``pytask.ini``,
6581
6682
[pytask]
6783
n_processes = 1
68-
parallel_backend = processes
84+
parallel_backend = processes # or loky if installed.
6985
7086
7187
Changes

Diff for: environment.yml

+2
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,10 @@ dependencies:
1414
# Package dependencies
1515
- pytask >= 0.0.6
1616
- cloudpickle
17+
- loky
1718

1819
# Misc
20+
- black
1921
- bump2version
2022
- jupyterlab
2123
- matplotlib

Diff for: setup.cfg

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[bumpversion]
2-
current_version = 0.0.3
2+
current_version = 0.0.4
33
parse = (?P<major>\d+)\.(?P<minor>\d+)(\.(?P<patch>\d+))(\-?((dev)?(?P<dev>\d+))?)
44
serialize =
55
{major}.{minor}.{patch}dev{dev}

Diff for: setup.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
setup(
55
name="pytask-parallel",
6-
version="0.0.3",
6+
version="0.0.4",
77
packages=find_packages(where="src"),
88
package_dir={"": "src"},
99
entry_points={"pytask": ["pytask_parallel = pytask_parallel.plugin"]},

Diff for: src/pytask_parallel/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "0.0.3"
1+
__version__ = "0.0.4"

Diff for: src/pytask_parallel/backends.py

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
"""This module configures the available backends."""
2+
from concurrent.futures import ProcessPoolExecutor
3+
from concurrent.futures import ThreadPoolExecutor
4+
5+
6+
PARALLEL_BACKENDS = {
7+
"processes": ProcessPoolExecutor,
8+
"threads": ThreadPoolExecutor,
9+
}
10+
11+
PARALLEL_BACKENDS_DEFAULT = "processes"
12+
13+
try:
14+
from loky import get_reusable_executor
15+
16+
PARALLEL_BACKENDS["loky"] = get_reusable_executor
17+
PARALLEL_BACKENDS_DEFAULT = "loky"
18+
except ImportError:
19+
pass

Diff for: src/pytask_parallel/build.py

+7-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
"""Extend the build command."""
22
import click
33
from _pytask.config import hookimpl
4+
from pytask_parallel.backends import PARALLEL_BACKENDS
5+
from pytask_parallel.backends import PARALLEL_BACKENDS_DEFAULT
46

57

68
@hookimpl
@@ -18,8 +20,11 @@ def pytask_extend_command_line_interface(cli):
1820
),
1921
click.Option(
2022
["--parallel-backend"],
21-
type=click.Choice(["processes", "threads"]),
22-
help="Backend for the parallelization. [default: processes]",
23+
type=click.Choice(PARALLEL_BACKENDS),
24+
help=(
25+
"Backend for the parallelization. "
26+
f"[default: {PARALLEL_BACKENDS_DEFAULT}]"
27+
),
2328
default=None,
2429
),
2530
click.Option(

Diff for: src/pytask_parallel/callbacks.py

+10-5
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
"""Validate command line inputs and configuration values."""
2+
from pytask_parallel.backends import PARALLEL_BACKENDS
23

34

45
def n_workers_callback(value):
56
"""Validate the n-workers option."""
67
if value == "auto":
78
pass
8-
elif value is None or value == "None":
9+
elif value in [None, "None", "none"]:
910
value = None
1011
elif isinstance(value, int) and 1 <= value:
1112
pass
@@ -19,16 +20,20 @@ def n_workers_callback(value):
1920

2021
def parallel_backend_callback(value):
2122
"""Validate the input for the parallel backend."""
22-
if value == "None":
23+
if value in [None, "None", "none"]:
2324
value = None
24-
if value not in ["processes", "threads", None]:
25-
raise ValueError("parallel_backend has to be 'processes' or 'threads'.")
25+
elif value in PARALLEL_BACKENDS:
26+
pass
27+
else:
28+
raise ValueError(
29+
f"parallel_backend has to be one of {list(PARALLEL_BACKENDS)}."
30+
)
2631
return value
2732

2833

2934
def delay_callback(value):
3035
"""Validate the delay option."""
31-
if value is None or value == "None":
36+
if value in [None, "None", "none"]:
3237
value = None
3338
else:
3439
try:

Diff for: src/pytask_parallel/config.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
from _pytask.config import hookimpl
55
from _pytask.shared import get_first_non_none_value
6+
from pytask_parallel.backends import PARALLEL_BACKENDS_DEFAULT
67
from pytask_parallel.callbacks import delay_callback
78
from pytask_parallel.callbacks import n_workers_callback
89
from pytask_parallel.callbacks import parallel_backend_callback
@@ -33,7 +34,7 @@ def pytask_parse_config(config, config_from_cli, config_from_file):
3334
config_from_cli,
3435
config_from_file,
3536
key="parallel_backend",
36-
default="processes",
37+
default=PARALLEL_BACKENDS_DEFAULT,
3738
callback=parallel_backend_callback,
3839
)
3940

Diff for: src/pytask_parallel/execute.py

+9-14
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,22 @@
1+
"""Contains code relevant to the execution."""
12
import sys
23
import time
3-
from concurrent.futures import ProcessPoolExecutor
4-
from concurrent.futures import ThreadPoolExecutor
54

65
import cloudpickle
76
import networkx as nx
87
from _pytask.config import hookimpl
98
from _pytask.report import ExecutionReport
9+
from pytask_parallel.backends import PARALLEL_BACKENDS
1010
from pytask_parallel.scheduler import TopologicalSorter
1111

12-
PARALLEL_BACKEND = {
13-
"processes": ProcessPoolExecutor,
14-
"threads": ThreadPoolExecutor,
15-
}
16-
1712

1813
@hookimpl
1914
def pytask_post_parse(config):
2015
"""Register the parallel backend."""
2116
if config["parallel_backend"] == "processes":
2217
config["pm"].register(ProcessesNameSpace)
23-
elif config["parallel_backend"] == "threads":
24-
config["pm"].register(ThreadsNameSpace)
18+
elif config["parallel_backend"] in ["threads", "loky"]:
19+
config["pm"].register(DefaultBackendNameSpace)
2520

2621

2722
@hookimpl(tryfirst=True)
@@ -57,7 +52,7 @@ def pytask_execute_build(session):
5752
reports = []
5853
running_tasks = {}
5954

60-
parallel_backend = PARALLEL_BACKEND[session.config["parallel_backend"]]
55+
parallel_backend = PARALLEL_BACKENDS[session.config["parallel_backend"]]
6156

6257
with parallel_backend(max_workers=session.config["n_workers"]) as executor:
6358

@@ -149,20 +144,20 @@ def pytask_execute_task(session, task): # noqa: N805
149144
def unserialize_and_execute_task(bytes_):
150145
"""Unserialize and execute task.
151146
152-
This function receives bytes and unpickles them to a task which is them execute in a
153-
spawned process or thread.
147+
This function receives bytes and unpickles them to a task which is them execute
148+
in a spawned process or thread.
154149
155150
"""
156151
task = cloudpickle.loads(bytes_)
157152
task.execute()
158153

159154

160-
class ThreadsNameSpace:
155+
class DefaultBackendNameSpace:
161156
@hookimpl(tryfirst=True)
162157
def pytask_execute_task(session, task): # noqa: N805
163158
"""Execute a task.
164159
165-
Since threads share their memory, it is not necessary to pickle and unpickle the
160+
Since threads have shared memory, it is not necessary to pickle and unpickle the
166161
task.
167162
168163
"""

Diff for: src/pytask_parallel/logging.py

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
"""Contains code relevant to logging."""
2+
import click
3+
from _pytask.config import hookimpl
4+
5+
6+
@hookimpl(trylast=True)
7+
def pytask_log_session_header(session):
8+
"""Add a note for how many workers are spawned."""
9+
n_workers = session.config["n_workers"]
10+
if n_workers > 1:
11+
click.echo(f"Started {n_workers} workers.")

Diff for: src/pytask_parallel/plugin.py

+2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from pytask_parallel import build
44
from pytask_parallel import config
55
from pytask_parallel import execute
6+
from pytask_parallel import logging
67

78

89
@hookimpl
@@ -11,3 +12,4 @@ def pytask_add_hooks(pm):
1112
pm.register(build)
1213
pm.register(config)
1314
pm.register(execute)
15+
pm.register(logging)

Diff for: tests/test_callbacks.py

+6-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from contextlib import ExitStack as does_not_raise # noqa: N813
22

33
import pytest
4+
from pytask_parallel.backends import PARALLEL_BACKENDS
45
from pytask_parallel.callbacks import delay_callback
56
from pytask_parallel.callbacks import n_workers_callback
67
from pytask_parallel.callbacks import parallel_backend_callback
@@ -17,6 +18,7 @@
1718
("asdad", pytest.raises(ValueError)),
1819
(None, does_not_raise()),
1920
("None", does_not_raise()),
21+
("none", does_not_raise()),
2022
("1", does_not_raise()),
2123
("1.1", pytest.raises(ValueError)),
2224
],
@@ -30,13 +32,13 @@ def test_n_workers_callback(value, expectation):
3032
@pytest.mark.parametrize(
3133
"value, expectation",
3234
[
33-
("threads", does_not_raise()),
34-
("processes", does_not_raise()),
3535
(1, pytest.raises(ValueError)),
3636
("asdad", pytest.raises(ValueError)),
3737
(None, does_not_raise()),
3838
("None", does_not_raise()),
39-
],
39+
("none", does_not_raise()),
40+
]
41+
+ [(i, does_not_raise()) for i in PARALLEL_BACKENDS],
4042
)
4143
def test_parallel_backend_callback(value, expectation):
4244
with expectation:
@@ -53,6 +55,7 @@ def test_parallel_backend_callback(value, expectation):
5355
("asdad", pytest.raises(ValueError)),
5456
(None, does_not_raise()),
5557
("None", does_not_raise()),
58+
("none", does_not_raise()),
5659
],
5760
)
5861
def test_delay_callback(value, expectation):

0 commit comments

Comments
 (0)