diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 3a18d3e..a3949f9 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -22,6 +22,10 @@ repos: - id: python-no-log-warn - id: python-use-type-annotations - id: text-unicode-replacement-char +- repo: https://github.com/aio-libs/sort-all + rev: v1.2.0 + hooks: + - id: sort-all - repo: https://github.com/astral-sh/ruff-pre-commit rev: v0.4.4 hooks: diff --git a/docs/source/api.md b/docs/source/api.md new file mode 100644 index 0000000..e205ef5 --- /dev/null +++ b/docs/source/api.md @@ -0,0 +1,15 @@ +# API + +```{eval-rst} +.. currentmodule:: pytask_parallel + +.. autoclass:: ParallelBackend +.. autoclass:: ParallelBackendRegistry + :members: +.. autoclass:: WorkerType +.. autodata:: registry + + An instantiated :class:`~pytask_parallel.ParallelBackendRegistry` to register or + overwrite parallel backends. + +``` diff --git a/docs/source/changes.md b/docs/source/changes.md index 0adcc30..04d0afe 100644 --- a/docs/source/changes.md +++ b/docs/source/changes.md @@ -28,6 +28,7 @@ releases are available on [PyPI](https://pypi.org/project/pytask-parallel) and - {pull}`106` fixes {pull}`99` such that only when there are coiled functions, all ready tasks are submitted. - {pull}`107` removes status from `pytask_execute_task_log_start` hook call. +- {pull}`109` improves the documentation. ## 0.4.1 - 2024-01-12 diff --git a/docs/source/coiled.md b/docs/source/coiled.md index 5463f7b..69408b5 100644 --- a/docs/source/coiled.md +++ b/docs/source/coiled.md @@ -58,8 +58,13 @@ pytask -n 2 pytask --parallel-backend loky ``` +```{note} +When you build a project using coiled, you will see a message after pytask's startup +that coiled is creating the remote software environment which takes 1-2m. +``` + When you apply the {func}`@task ` decorator to the task, make sure the -{func}`@coiled.function ` decorator is applied first, or is closer to +{func}`@coiled.function ` decorator is applied first or is closer to the function. Otherwise, it will be ignored. Add more arguments to the decorator to configure the hardware and software environment. @@ -71,7 +76,7 @@ By default, {func}`@coiled.function ` to the workload. It means that coiled infers from the number of submitted tasks and previous runtimes, how many additional remote workers it should deploy to handle the workload. It provides a convenient mechanism to scale without intervention. Also, -workers launched by {func}`@coiled.function ` will shutdown quicker +workers launched by {func}`@coiled.function ` will shut down quicker than a cluster. ```{seealso} @@ -88,7 +93,8 @@ coiled. Usually, it is not necessary and you are better off using coiled's serve functions. If you want to launch a cluster managed by coiled, register a function that builds an -executor using {class}`coiled.Cluster`. +executor using {class}`coiled.Cluster`. Assign a name to the cluster to reuse it when +you build your project again and the cluster has not been shut down. ```python import coiled @@ -98,7 +104,11 @@ from concurrent.futures import Executor def _build_coiled_executor(n_workers: int) -> Executor: - return coiled.Cluster(n_workers=n_workers).get_client().get_executor() + return ( + coiled.Cluster(n_workers=n_workers, name="coiled-project") + .get_client() + .get_executor() + ) registry.register_parallel_backend(ParallelBackend.CUSTOM, _build_coiled_executor) @@ -109,3 +119,12 @@ Then, execute your workflow with ```console pytask --parallel-backend custom ``` + +## Tips + +When you are changing your project during executions and your cluster is still up and +running, the local and the remote software environment can get out of sync. Then, you +see errors in remote workers you have fixed locally. + +A quick solution is to stop the cluster in the coiled dashboard and create a new one +with the next `pytask build`. diff --git a/docs/source/custom_executors.md b/docs/source/custom_executors.md index 715da68..cc7d341 100644 --- a/docs/source/custom_executors.md +++ b/docs/source/custom_executors.md @@ -1,33 +1,35 @@ # Custom Executors -```{caution} -The interface for custom executors is rudimentary right now. Please, give some feedback -if you managed to implement a custom executor or have suggestions for improvement. - -Please, also consider contributing your executor to pytask-parallel if you believe it +```{note} +Please, consider contributing your executor to pytask-parallel if you believe it could be helpful to other people. Start by creating an issue or a draft PR. ``` -pytask-parallel allows you to use your parallel backend as long as it follows the -interface defined by {class}`~concurrent.futures.Executor`. +pytask-parallel allows you to use any parallel backend as long as it follows the +interface defined by {class}`concurrent.futures.Executor`. In some cases, adding a new backend can be as easy as registering a builder function -that receives some arguments (currently only `n_workers`) and returns the instantiated -executor. +that receives `n_workers` and returns the instantiated executor. + +```{important} +Place the following code in any module that will be imported when you are executing +pytask. For example, the `src/project/config.py` in your project, the +`src/project/__init__.py` or the task module directly. +``` ```{literalinclude} ../../docs_src/custom_executors.py ``` -Given {class}`pytask_parallel.WorkerType` pytask applies automatic wrappers around the -task function to collect tracebacks, capture stdout/stderr and their like. The `remote` -keyword allows pytask to handle local paths automatically for remote clusters. +Given the optional {class}`~pytask_parallel.WorkerType` pytask applies automatic +wrappers around the task function to collect tracebacks, capture stdout/stderr and their +like. Possible values are `WorkerType.PROCESSES` (default) or `WorkerType.THREADS`. -Now, build the project requesting your custom backend. +The `remote` keyword signals pytask that tasks are executed in remote workers without +access to the local filesystem. pytask will then automatically sync local files to the +workers. By default, pytask assumes workers have access to the local filesystem. + +Now, build the project with your custom backend. ```console pytask --parallel-backend custom ``` - -```{important} -pytask applies automatic wrappers -``` diff --git a/docs/source/dask.md b/docs/source/dask.md index 0f023ea..0e267e2 100644 --- a/docs/source/dask.md +++ b/docs/source/dask.md @@ -8,15 +8,15 @@ package due to how pytask imports your code and dask serializes task functions Dask is a flexible library for parallel and distributed computing. You probably know it from its {class}`dask.dataframe` that allows lazy processing of big data. Here, we use -{mod}`distributed` that provides an interface similar to -{class}`~concurrent.futures.Executor` to parallelize our execution. +distributed that provides an interface similar to {class}`concurrent.futures.Executor` +to parallelize our execution. -There are a couple of ways in how we can use dask. +There are a couple of ways in which we can use dask. ## Local -By default, using dask as the parallel backend will launch a -{class}`distributed.LocalCluster` with processes on your local machine. +Using dask as the parallel backend will launch a {class}`distributed.LocalCluster` with +processes on your local machine. `````{tab-set} ````{tab-item} CLI @@ -53,10 +53,13 @@ terminals to launch as many dask workers as you like with dask worker ``` -Finally, write a function to build the dask client and register it as the dask backend. -Place the code somewhere in your codebase, preferably, where you store the main -configuration of your project in `config.py` or another module that will be imported -during execution. +Finally, write a function to build the dask client and register it as the backend. + +```{important} +Place the following code in any module that will be imported when you are executing +pytask. For example, the `src/project/config.py` in your project, the +`src/project/__init__.py` or the task module directly. +``` ```python from pytask_parallel import ParallelBackend @@ -73,7 +76,7 @@ registry.register_parallel_backend(ParallelBackend.DASK, _build_dask_executor) ``` You can also register it as the custom executor using -{class}`pytask_parallel.ParallelBackend.CUSTOM` to switch back to the default dask +{obj}`pytask_parallel.ParallelBackend.CUSTOM` to switch back to the default dask executor quickly. ```{seealso} @@ -84,7 +87,7 @@ You can find more information in the documentation for ## Remote You can learn how to deploy your tasks to a remote dask cluster in -[this guide](https://docs.dask.org/en/stable/deploying.html). They recommend to use +[this guide](https://docs.dask.org/en/stable/deploying.html). They recommend using coiled for deployment to cloud providers. [coiled](https://www.coiled.io/) is a product built on top of dask that eases the diff --git a/docs/source/index.md b/docs/source/index.md index a16ed8b..797108c 100644 --- a/docs/source/index.md +++ b/docs/source/index.md @@ -27,6 +27,7 @@ coiled dask custom_executors remote_backends +api developers_guide changes On Github diff --git a/docs/source/remote_backends.md b/docs/source/remote_backends.md index 3a2ffbe..f5f6024 100644 --- a/docs/source/remote_backends.md +++ b/docs/source/remote_backends.md @@ -21,12 +21,6 @@ to run their projects. ## Local files -Avoid using local files with remote backends and use storages like S3 for dependencies -and products. The reason is that every local file needs to be send to the remote workers -and when your internet connection is slow you will face a hefty penalty on runtime. - -## Local paths - In most projects you are using local paths to refer to dependencies and products of your tasks. This becomes an interesting problem with remote workers since your local files are not necessarily available in the remote machine. diff --git a/docs_src/custom_executors.py b/docs_src/custom_executors.py index 3e13527..050461a 100644 --- a/docs_src/custom_executors.py +++ b/docs_src/custom_executors.py @@ -7,9 +7,13 @@ def build_custom_executor(n_workers: int) -> Executor: - return CustomExecutor( - max_workers=n_workers, worker_type=WorkerType.PROCESSES, remote=False - ) + return CustomExecutor(max_workers=n_workers) -registry.register_parallel_backend(ParallelBackend.CUSTOM, build_custom_executor) +registry.register_parallel_backend( + ParallelBackend.CUSTOM, + build_custom_executor, + # Optional defaults. + worker_type=WorkerType.PROCESSES, + remote=False, +) diff --git a/src/pytask_parallel/__init__.py b/src/pytask_parallel/__init__.py index 0e32f3b..c21274f 100644 --- a/src/pytask_parallel/__init__.py +++ b/src/pytask_parallel/__init__.py @@ -3,6 +3,7 @@ from __future__ import annotations from pytask_parallel.backends import ParallelBackend +from pytask_parallel.backends import ParallelBackendRegistry from pytask_parallel.backends import WorkerType from pytask_parallel.backends import registry @@ -14,4 +15,10 @@ __version__ = "unknown" -__all__ = ["ParallelBackend", "__version__", "registry", "WorkerType"] +__all__ = [ + "ParallelBackend", + "ParallelBackendRegistry", + "WorkerType", + "__version__", + "registry", +] diff --git a/src/pytask_parallel/backends.py b/src/pytask_parallel/backends.py index 7fb947d..582833b 100644 --- a/src/pytask_parallel/backends.py +++ b/src/pytask_parallel/backends.py @@ -83,10 +83,26 @@ def _get_thread_pool_executor(n_workers: int) -> Executor: class ParallelBackend(Enum): - """Choices for parallel backends.""" + """Choices for parallel backends. + + Attributes + ---------- + NONE + No parallel backend. + CUSTOM + A custom parallel backend. + DASK + A dask parallel backend. + LOKY + A loky parallel backend. + PROCESSES + A process pool parallel backend. + THREADS + A thread pool parallel backend. + + """ NONE = "none" - CUSTOM = "custom" DASK = "dask" LOKY = "loky" @@ -95,7 +111,16 @@ class ParallelBackend(Enum): class WorkerType(Enum): - """A type for workers that either spawned as threads or processes.""" + """A type for workers that either spawned as threads or processes. + + Attributes + ---------- + THREADS + Workers are threads. + PROCESSES + Workers are processes. + + """ THREADS = "threads" PROCESSES = "processes" diff --git a/src/pytask_parallel/execute.py b/src/pytask_parallel/execute.py index 1a6b61b..ec922a4 100644 --- a/src/pytask_parallel/execute.py +++ b/src/pytask_parallel/execute.py @@ -239,6 +239,7 @@ def pytask_execute_task(session: Session, task: PTask) -> Future[WrapperResult]: show_locals=session.config["show_locals"], task_filterwarnings=get_marks(task, "filterwarnings"), ) + if worker_type == WorkerType.THREADS: # Prevent circular import for loky backend. from pytask_parallel.wrappers import wrap_task_in_thread @@ -246,6 +247,7 @@ def pytask_execute_task(session: Session, task: PTask) -> Future[WrapperResult]: return session.config["_parallel_executor"].submit( wrap_task_in_thread, task=task, remote=False, **kwargs ) + msg = f"Unknown worker type {worker_type}" raise ValueError(msg)