diff --git a/docs/source/coiled.md b/docs/source/coiled.md new file mode 100644 index 0000000..d40b475 --- /dev/null +++ b/docs/source/coiled.md @@ -0,0 +1,103 @@ +# coiled + +```{caution} +Currently, the coiled backend can only be used if your workflow code is organized in a +package due to how pytask imports your code and dask serializes task functions +([issue](https://github.com/dask/distributed/issues/8607)). +``` + +[coiled](https://www.coiled.io/) is a product built on top of dask that eases the +deployment of your workflow to many cloud providers like AWS, GCP, and Azure. + +Note that, coiled is a paid service. They offer a +[free monthly tier](https://www.coiled.io/pricing) where you only need to pay the costs +for your cloud provider and you can get started without a credit card. + +They provide the following benefits which are especially helpful to people who are not +familiar with cloud providers or remote computing. + +- coiled manages your resources by spawning workers if you need them and shutting them + down if they are idle. +- [Synchronization](https://docs.coiled.io/user_guide/software/sync.html) of your local + environment to remote workers. +- [Adaptive scaling](https://docs.dask.org/en/latest/adaptive.html) if your workflow + takes a long time to finish. + +There are two ways how you can use coiled with pytask and pytask-parallel. + +1. Run individual tasks in the cloud. +1. Run your whole workflow in the cloud. + +Both approaches are explained below after the setup. + +## Setup + +Follow coiled's +[four step short process](https://docs.coiled.io/user_guide/setup/index.html) to set up +your local environment and configure your cloud provider. + +## Running individual tasks + +In most projects there are a just couple of tasks that require a lot of resources and +that you would like to run in a virtual machine in the cloud. + +With coiled's +[serverless functions](https://docs.coiled.io/user_guide/usage/functions/index.html), +you can define the hardware and software environment for your task. Just decorate the +task function with a {func}`@coiled.function ` decorator. + +```{literalinclude} ../../docs_src/coiled/coiled_functions.py +``` + +To execute the workflow, you need to turn on parallelization by requesting two or more +workers or specifying one of the parallel backends. Otherwise, the decorated task is run +locally. + +```console +pytask -n 2 +pytask --parallel-backend loky +``` + +When you apply the {func}`@task ` decorator to the task, make sure the +{func}`@coiled.function ` decorator is applied first, or is closer to +the function. Otherwise, it will be ignored. Add more arguments to the decorator to +configure the hardware and software environment. + +```{literalinclude} ../../docs_src/coiled/coiled_functions_task.py +``` + +```{seealso} +Serverless functions are more thoroughly explained in +[coiled's guide](https://docs.coiled.io/user_guide/usage/functions/index.html). +``` + +(coiled-clusters)= + +## Running a cluster + +It is also possible to launch a cluster and run each task in a worker provided by +coiled. Usually, it is not necessary and you are better off using coiled's serverless +functions. + +If you want to launch a cluster managed by coiled, register a function that builds an +executor using {class}`coiled.Cluster`. + +```python +import coiled +from pytask_parallel import ParallelBackend +from pytask_parallel import registry +from concurrent.futures import Executor + + +def _build_coiled_executor(n_workers: int) -> Executor: + return coiled.Cluster(n_workers=n_workers).get_client().get_executor() + + +registry.register_parallel_backend(ParallelBackend.CUSTOM, _build_coiled_executor) +``` + +Then, execute your workflow with + +```console +pytask --parallel-backend custom +``` diff --git a/docs/source/conf.py b/docs/source/conf.py index f537001..c8d69ce 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -87,6 +87,7 @@ "coiled": ("https://docs.coiled.io/", None), "dask": ("https://docs.dask.org/en/stable/", None), "distributed": ("https://distributed.dask.org/en/stable/", None), + "pytask": ("https://pytask-dev.readthedocs.io/en/stable/", None), "python": ("https://docs.python.org/3.10", None), } diff --git a/docs/source/custom_executors.md b/docs/source/custom_executors.md index f44b377..715da68 100644 --- a/docs/source/custom_executors.md +++ b/docs/source/custom_executors.md @@ -15,43 +15,19 @@ In some cases, adding a new backend can be as easy as registering a builder func that receives some arguments (currently only `n_workers`) and returns the instantiated executor. -```python -from concurrent.futures import Executor -from my_project.executor import CustomExecutor - -from pytask_parallel import ParallelBackend, registry - - -def build_custom_executor(n_workers: int) -> Executor: - return CustomExecutor(max_workers=n_workers) - - -registry.register_parallel_backend(ParallelBackend.CUSTOM, build_custom_executor) +```{literalinclude} ../../docs_src/custom_executors.py ``` +Given {class}`pytask_parallel.WorkerType` pytask applies automatic wrappers around the +task function to collect tracebacks, capture stdout/stderr and their like. The `remote` +keyword allows pytask to handle local paths automatically for remote clusters. + Now, build the project requesting your custom backend. ```console pytask --parallel-backend custom ``` -Realistically, it is not the only necessary adjustment for a nice user experience. There -are two other important things. pytask-parallel does not implement them by default since -it seems more tightly coupled to your backend. - -1. A wrapper for the executed function that captures warnings, catches exceptions and - saves products of the task (within the child process!). - - As an example, see - [`def _execute_task()`](https://github.com/pytask-dev/pytask-parallel/blob/c441dbb75fa6ab3ab17d8ad5061840c802dc1c41/src/pytask_parallel/processes.py#L91-L155) - that does all that for the processes and loky backend. - -1. To apply the wrapper, you need to write a custom hook implementation for - `def pytask_execute_task()`. See - [`def pytask_execute_task()`](https://github.com/pytask-dev/pytask-parallel/blob/c441dbb75fa6ab3ab17d8ad5061840c802dc1c41/src/pytask_parallel/processes.py#L41-L65) - for an example. Use the - [`hook_module`](https://pytask-dev.readthedocs.io/en/stable/how_to_guides/extending_pytask.html#using-hook-module-and-hook-module) - configuration value to register your implementation. - -Another example of an implementation can be found as a -[test](https://github.com/pytask-dev/pytask-parallel/blob/c441dbb75fa6ab3ab17d8ad5061840c802dc1c41/tests/test_backends.py#L35-L78). +```{important} +pytask applies automatic wrappers +``` diff --git a/docs/source/dask.md b/docs/source/dask.md index e0b2ac1..99eabea 100644 --- a/docs/source/dask.md +++ b/docs/source/dask.md @@ -81,48 +81,17 @@ You can find more information in the documentation for [`dask.distributed`](https://distributed.dask.org/en/stable/). ``` -## Remote - Using cloud providers with coiled +## Remote + +You can learn how to deploy your tasks to a remote dask cluster in [this +guide](https://docs.dask.org/en/stable/deploying.html). They recommend to use coiled for +deployment to cloud providers. [coiled](https://www.coiled.io/) is a product built on top of dask that eases the deployment of your workflow to many cloud providers like AWS, GCP, and Azure. -They offer a [free monthly tier](https://www.coiled.io/pricing) where you only -need to pay the costs for your cloud provider and you can get started without a credit -card. - -Furthermore, they offer the following benefits which are especially helpful to people -who are not familiar with cloud providers or remote computing. - -- A [four step short process](https://docs.coiled.io/user_guide/setup/index.html) to set - up your local environment and configure your cloud provider. -- coiled manages your resources by spawning workers if you need them and shutting them - down if they are idle. -- Synchronization of your local environment to remote workers. - -So, how can you run your pytask workflow on a cloud infrastructure with coiled? - -1. Follow their [guide on getting - started](https://docs.coiled.io/user_guide/setup/index.html) by creating a coiled - account and syncing it with your cloud provider. - -1. Register a function that builds an executor using {class}`coiled.Cluster`. - - ```python - import coiled - from pytask_parallel import ParallelBackend - from pytask_parallel import registry - from concurrent.futures import Executor - - - def _build_coiled_executor(n_workers: int) -> Executor: - return coiled.Cluster(n_workers=n_workers).get_client().get_executor() - - - registry.register_parallel_backend(ParallelBackend.CUSTOM, _build_coiled_executor) - ``` - -1. Execute your workflow with +If you want to run the tasks in your project on a cluster managed by coiled read +{ref}`this guide `. - ```console - pytask --parallel-backend custom - ``` +Otherwise, follow the instructions in [dask's +guide](https://docs.dask.org/en/stable/deploying.html). diff --git a/docs/source/index.md b/docs/source/index.md index 6209512..d362a0f 100644 --- a/docs/source/index.md +++ b/docs/source/index.md @@ -23,6 +23,7 @@ pytask-parallel allows to execute workflows defined with maxdepth: 1 --- quickstart +coiled dask custom_executors developers_guide diff --git a/docs/source/quickstart.md b/docs/source/quickstart.md index b780fcc..dd583cb 100644 --- a/docs/source/quickstart.md +++ b/docs/source/quickstart.md @@ -18,7 +18,9 @@ $ conda install -c conda-forge pytask-parallel When the plugin is only installed and pytask executed, the tasks are not run in parallel. -For parallelization with the default backend [loky](https://loky.readthedocs.io/), you need to launch multiple workers. +For parallelization with the default backend [loky](https://loky.readthedocs.io/), you +need to launch multiple workers or specify the parallel backend explicitly. Here, is how +you launch multiple workers. `````{tab-set} ````{tab-item} CLI @@ -45,8 +47,8 @@ n_workers = "auto" ```` ````` -To use a different backend, pass the `--parallel-backend` option. The following command -will execute the workflow with one worker and the loky backend. +To specify the parallel backend, pass the `--parallel-backend` option. The following +command will execute the workflow with one worker and the loky backend. `````{tab-set} ````{tab-item} CLI @@ -72,23 +74,32 @@ parallel_backend = "loky" It is not possible to combine parallelization with debugging. That is why `--pdb` or `--trace` deactivate parallelization. -If you parallelize the execution of your tasks using two or more workers, do not use -`breakpoint()` or `import pdb; pdb.set_trace()` since both will cause exceptions. +If you parallelize the execution of your tasks, do not use `breakpoint()` or +`import pdb; pdb.set_trace()` since both will cause exceptions. ``` ### loky There are multiple backends available. The default is the backend provided by loky which -aims to be a more robust implementation of {class}`~multiprocessing.pool.Pool` and in +is a more robust implementation of {class}`~multiprocessing.pool.Pool` and in {class}`~concurrent.futures.ProcessPoolExecutor`. ```console pytask --parallel-backend loky ``` -As it spawns workers in new processes to run the tasks, it is especially suited for -CPU-bound tasks. ([Here](https://stackoverflow.com/a/868577/7523785) is an -explanation of what CPU- or IO-bound means.) +A parallel backend with processes is especially suited for CPU-bound tasks as it spawns +workers in new processes to run the tasks. +([Here](https://stackoverflow.com/a/868577/7523785) is an explanation of what CPU- or +IO-bound means.) + +### coiled + +pytask-parallel integrates with coiled allowing to run tasks in virtual machines of AWS, +Azure and GCP. You can decide whether to run only some selected tasks or the whole +project in the cloud. + +Read more about coiled in [this guide](coiled.md). ### `concurrent.futures` @@ -141,11 +152,10 @@ Capturing warnings is not thread-safe. Therefore, warnings cannot be captured re when tasks are parallelized with `--parallel-backend threads`. ``` -### dask + coiled +### dask -dask and coiled together provide the option to execute your workflow on cloud providers -like AWS, GCP or Azure. Check out the [dedicated guide](dask.md) if you are interested -in that. +dask allows to run your workflows on many different kinds of clusters like cloud +clusters and traditional HPC. Using the default mode, dask will spawn multiple local workers to process the tasks. diff --git a/docs_src/coiled/coiled_functions.py b/docs_src/coiled/coiled_functions.py new file mode 100644 index 0000000..ca1a375 --- /dev/null +++ b/docs_src/coiled/coiled_functions.py @@ -0,0 +1,6 @@ +import coiled + + +@coiled.function() +def task_example() -> None: + pass diff --git a/docs_src/coiled/coiled_functions_task.py b/docs_src/coiled/coiled_functions_task.py new file mode 100644 index 0000000..51d2665 --- /dev/null +++ b/docs_src/coiled/coiled_functions_task.py @@ -0,0 +1,12 @@ +import coiled +from pytask import task + + +@task +@coiled.function( + region="eu-central-1", # Run the task close to you. + memory="512 GB", # Use a lot of memory. + cpu=128, # Use a lot of CPU. + vm_type="p3.2xlarge", # Run a GPU instance. +) +def task_example() -> None: ... diff --git a/docs_src/custom_executors.py b/docs_src/custom_executors.py new file mode 100644 index 0000000..3e13527 --- /dev/null +++ b/docs_src/custom_executors.py @@ -0,0 +1,15 @@ +from concurrent.futures import Executor + +from my_project.executor import CustomExecutor +from pytask_parallel import ParallelBackend +from pytask_parallel import WorkerType +from pytask_parallel import registry + + +def build_custom_executor(n_workers: int) -> Executor: + return CustomExecutor( + max_workers=n_workers, worker_type=WorkerType.PROCESSES, remote=False + ) + + +registry.register_parallel_backend(ParallelBackend.CUSTOM, build_custom_executor) diff --git a/src/pytask_parallel/__init__.py b/src/pytask_parallel/__init__.py index 812a622..0e32f3b 100644 --- a/src/pytask_parallel/__init__.py +++ b/src/pytask_parallel/__init__.py @@ -3,6 +3,7 @@ from __future__ import annotations from pytask_parallel.backends import ParallelBackend +from pytask_parallel.backends import WorkerType from pytask_parallel.backends import registry try: @@ -13,4 +14,4 @@ __version__ = "unknown" -__all__ = ["ParallelBackend", "__version__", "registry"] +__all__ = ["ParallelBackend", "__version__", "registry", "WorkerType"] diff --git a/src/pytask_parallel/config.py b/src/pytask_parallel/config.py index 618cc38..0b6daf6 100644 --- a/src/pytask_parallel/config.py +++ b/src/pytask_parallel/config.py @@ -47,5 +47,5 @@ def pytask_post_parse(config: dict[str, Any]) -> None: return # Register parallel execute and logging hook. - config["pm"].register(logging) config["pm"].register(execute) + config["pm"].register(logging) diff --git a/src/pytask_parallel/execute.py b/src/pytask_parallel/execute.py index 0884957..093ec9c 100644 --- a/src/pytask_parallel/execute.py +++ b/src/pytask_parallel/execute.py @@ -26,6 +26,7 @@ from pytask_parallel.backends import registry from pytask_parallel.utils import create_kwargs_for_task from pytask_parallel.utils import get_module +from pytask_parallel.utils import is_coiled_function from pytask_parallel.utils import parse_future_result if TYPE_CHECKING: @@ -170,6 +171,30 @@ def pytask_execute_task(session: Session, task: PTask) -> Future[WrapperResult]: kwargs = create_kwargs_for_task(task, remote=remote) + if is_coiled_function(task): + # Prevent circular import for coiled backend. + from pytask_parallel.wrappers import rewrap_task_with_coiled_function + + wrapper_func = rewrap_task_with_coiled_function(task) + + # Task modules are dynamically loaded and added to `sys.modules`. Thus, + # cloudpickle believes the module of the task function is also importable in the + # child process. We have to register the module as dynamic again, so that + # cloudpickle will pickle it with the function. See cloudpickle#417, pytask#373 + # and pytask#374. + task_module = get_module(task.function, getattr(task, "path", None)) + cloudpickle.register_pickle_by_value(task_module) + + return wrapper_func.submit( + task=task, + console_options=console.options, + kwargs=kwargs, + remote=True, + session_filterwarnings=session.config["filterwarnings"], + show_locals=session.config["show_locals"], + task_filterwarnings=get_marks(task, "filterwarnings"), + ) + if worker_type == WorkerType.PROCESSES: # Prevent circular import for loky backend. from pytask_parallel.wrappers import wrap_task_in_process diff --git a/src/pytask_parallel/utils.py b/src/pytask_parallel/utils.py index 0d54d7a..6cb044c 100644 --- a/src/pytask_parallel/utils.py +++ b/src/pytask_parallel/utils.py @@ -28,12 +28,19 @@ from pytask_parallel.wrappers import WrapperResult +try: + from coiled.function import Function as CoiledFunction +except ImportError: + + class CoiledFunction: ... # type: ignore[no-redef] + __all__ = [ "create_kwargs_for_task", "get_module", "parse_future_result", "is_local_path", + "is_coiled_function", ] @@ -165,3 +172,8 @@ def get_module(func: Callable[..., Any], path: Path | None) -> ModuleType: def is_local_path(path: Path) -> bool: """Check if a path is local.""" return isinstance(path, (FilePath, PosixPath, WindowsPath)) + + +def is_coiled_function(task: PTask) -> bool: + """Check if a function is a coiled function.""" + return "coiled_kwargs" in task.attributes diff --git a/src/pytask_parallel/wrappers.py b/src/pytask_parallel/wrappers.py index 185f819..b7f8a33 100644 --- a/src/pytask_parallel/wrappers.py +++ b/src/pytask_parallel/wrappers.py @@ -2,6 +2,7 @@ from __future__ import annotations +import functools import sys import warnings from contextlib import redirect_stderr @@ -24,6 +25,7 @@ from pytask.tree_util import tree_map_with_path from pytask.tree_util import tree_structure +from pytask_parallel.utils import CoiledFunction from pytask_parallel.utils import is_local_path if TYPE_CHECKING: @@ -80,7 +82,7 @@ def wrap_task_in_process( # noqa: PLR0913 show_locals: bool, task_filterwarnings: tuple[Mark, ...], ) -> WrapperResult: - """Unserialize and execute task. + """Execute a task in a spawned process. This function receives bytes and unpickles them to a task which is them execute in a spawned process or thread. @@ -149,6 +151,12 @@ def wrap_task_in_process( # noqa: PLR0913 ) +def rewrap_task_with_coiled_function(task: PTask) -> CoiledFunction: + return functools.wraps(wrap_task_in_process)( + CoiledFunction(wrap_task_in_process, **task.attributes["coiled_kwargs"]) + ) + + def _raise_exception_on_breakpoint(*args: Any, **kwargs: Any) -> None: # noqa: ARG001 msg = ( "You cannot use 'breakpoint()' or 'pdb.set_trace()' while parallelizing the "