Sometimes the answer to our scaling problem isn’t throwing more computers at it; it’s throwing different types of resources at it. One example of this might be ten thousand monkeys trying to reproduce the works of Shakespeare, versus one Shakespeare.[1] While performance varies, some benchmarks have shown up to an 85% improvement in model training times when using GPUs over CPUs. Continuing its modular tradition, the GPU logic of Dask is found in the libraries and ecosystem surrounding it. The libraries can either run on a collection of GPU workers or parallelize work over different GPUs on one host.
Most work we do on the computer is done on the CPU. GPUs were created for displaying video but involve doing large amounts of vectorized floating point (e.g., non-integer) operations. With vectorized operations, the same operation is applied in parallel on large sets of data, like a map
. Tensor Processing Units (TPUs) are similar to GPUs, except without also being used for graphics.
For our purposes, in Dask, we can think of GPUs and TPUs as specializing in offloading large vectorized computations, but there are many other kinds of accelerators. While much of this chapter is focused on GPUs, the same general techniques, albeit with different libraries, generally apply to other accelerators. Other kinds of specialized resources include NVMe drives, faster (or larger) RAM, TCP/IP offload, Just-a-Bunch-of-Disks expansion ports, and Intel’s OPTAIN memory. Special resources/accelerators can improve everything from network latency to writing large files to disk. What all these share is that Dask has no built-in understanding of these resources, and it’s up to you to provide that information to the Dask scheduler and also take advantage of it.
This chapter will look at the current state of accelerated analytics in Python and how to use these tools together with Dask. You will learn what kinds of problems are well suited to GPU acceleration, a bit about other kinds of accelerators, and how to apply this knowledge to your problems.
Warning
|
Cloud accounts and machines with access to GPUs are especially of interest to less-than-savory folks on the internet due to the relative ease of mining cryptocurrency. If you are used to working with only public data and lax security controls, take this as an opportunity to review your security process and restrict runtime access to only those who need it. Or be prepared for a really large cloud bill. |
Accelerators largely break down into two categories: transparent (no code or change required) and non-transparent optimizers. Whether an accelerator is transparent or not largely depends on whether someone below us in the stack has made it transparent to us.
TCP/IP offloading is generally transparent at the user space level, which means the operating system takes care of it for us. NVMe drives are also generally transparent, generally appearing the same as spinning disks, except faster. It is still important to make Dask aware of transparent optimizers; for example, a disk-intensive workload should be scheduled on the machines with faster disks.
The non-transparent accelerators include GPUs, Optane, QAT, and many more. Using these requires changing our code to be able to take advantage of them. Sometimes this can be as simple as swapping in a different library, but not always. Many non-transparent accelerators require either copying our data or special formatting to be able to operate. This means that if an operation is relatively fast, moving to an optimizer could make it slower.
Not every problem is a good fit for GPU acceleration. GPUs are especially good at performing the same calculation on a large number of data points at the same time. If a problem is well suited to vectorized computation, then there is a good chance that GPUs may be well suited to it.
Some common problems that benefit from GPU acceleration include:
-
Machine learning
-
Linear algebra
-
Physics simulations
-
Graphics (no surprise here)
GPUs are not well suited to branch-heavy non-vectorized workflows, or workflows where the cost of copying the data is similar to or higher than the cost of the computation.
If you have decided that your problem is well suited to a specialized resource, the next step is to make the scheduler aware of which machines and processes have the resource. You can do this by adding either an environment variable or a command-line flag to the worker launch (e.g., --resources "GPU=2"
or DASK_DISTRIBUTED__WORKER__RESOURCES__GPU=2
).
For NVIDIA users, the dask-cuda
package can launch one worker per GPU, pinning the GPU and thread together for performance. For example, on our Kubernetes cluster with GPU resources, we configure the workers to use the dask-cuda-worker
launcher, as shown in Using the dask-cuda-worker
package in the Dask Kubernetes template.
dask-cuda-worker
package in the Dask Kubernetes templatelink:./examples/dask/DaskKubeGPUs.py[role=include]
Here you see we still add the --resources
flag so that in a mixed environment we can select just the GPU workers.
If you’re using Dask to schedule work on multiple GPUs on a single computer (e.g., using Dask local mode with CUDA), the same dask-cuda
package provides a LocalCUDACluster
. As with dask-cuda-worker
, you still need to add the resource tag manually, as shown in LocalCUDACluster
with resource tagging, but it launches the correct workers and pins them to threads.
LocalCUDACluster
with resource tagginglink:./examples/dask/DaskGPUs.py[role=include]
Note
|
For homogenous clusters it may seem tempting to avoid labeling these resources, but unless you will always have a 1:1 mapping of worker process/thread to the accelerator (or the accelerator can be used by all workers at the same time), it is still beneficial to label these resources. This is important for non-shareable (or difficult-to-share) resources like GPUs/TPUs since Dask might schedule two tasks trying to access the GPU. But for shareable resources like NVMe drives, or TCP/IP offloading, if it’s present on every node in the cluster and will always be, you can probably skip it. |
It’s important to note that Dask does not manage custom resources (including GPUs). If another process uses all of the GPU cores without asking Dask, there is no protection for this. In some ways, this is reminiscent of early computing, where we had "cooperative" multi-tasking; we depend on our neighbors being well behaved.
Warning
|
Dask depends on well-behaved Python code, which does not use resources it has not asked for and releases the resources when finished. This most commonly happens with memory leaks (both accelerated and not), often with specialized libraries like CUDA that allocate memory outside of Python. These libraries often have special steps you need to call when you are done with the task you’ve asked to make the resources available for others. |
Now that Dask is aware of the special resources on your cluster, it’s time to make sure that your code can take advantage of them. Often, but not always, these accelerators will require some kind of special library to be installed, which may involve long compile times. When possible, installing the acceleration libraries from conda and pre-installing on the workers (in the container or on the host) can help minimize this overhead.
For Kubernetes (or other Docker container users), you can do this by making a custom container with the accelerator libraries pre-installed, as seen in Pre-installing cuDF.
link:./examples/dask/preinstall_numba/Dockerfile[role=include]
Then, to build this, we run the script shown in Building custom Dask Docker containers.
link:./examples/dask/preinstall_numba/build.sh[role=include]
It is important that you make sure your tasks that need accelerators run on worker processes with the accelerator available. You can ask for special resources when scheduling tasks with Dask, either explicitly in client.submit
, as seen in Submitting a task asking for a GPU, or by adding an annotation to your existing code, as seen in Annotating a group of operations as needing a GPU.
link:./examples/dask/DaskGPUs.py[role=include]
link:./examples/dask/DaskGPUs.py[role=include]
If you move from a cluster with GPU resources to a cluster without, this code will hang indefinitely. The CPU Fallback design pattern covered later can mitigate this.
Numba is a popular high-performance JIT compilation library for Python, which also has support for various accelerators. Most JIT code, as well as many decorator functions, is generally not directly serializable, so attempting to directly Numba it with dask.submit
, as seen in Decorator difficulty, does not work. Instead, the correct way is to wrap the function, as shown in Decorator hack.
link:./examples/dask/DaskGPUs.py[role=include]
link:./examples/dask/DaskGPUs.py[role=include]
Note
|
Decorator difficulty will work in local mode—but not when you go to scale. |
Like most tasks in Python, there are many different libraries for working with GPUs. Many of these libraries support NVIDIA’s Compute Unified Device Architecture (CUDA) with experimental support for AMD’s new open HIP/Radeon Open Compute module (ROCm) interfaces. NVIDIA and CUDA were the first on the scene and have a much larger adoption than AMD’s Radeon Open Compute module—so much so that ROCm has a large focus on supporting ports of CUDA software to the ROCm platform.
We won’t dive deep into the world of Python GPU libraries, but you may want to check out Numba for GPUs, TensorFlow GPU support, and PyTorch's GPU support.
Most of the libraries that have some form of GPU support require compiling large amounts of non-Python code. As such, it’s often best to install these libraries with conda, which frequently has more complete binary packaging, allowing you to skip the compile step.
The three main CUDA libraries extending Dask are cuDF (previously called dask-cudf), BlazingSQL, and cuML.[2] Currently these libraries are focused on NVIDIA GPUs.
Note
|
Dask does not currently have any libraries powering integrations with OpenCL or HIP. This does not preclude you in any way from using GPUs with libraries that support them, like TensorFlow, as previously illustrated. |
cuDF is a GPU-accelerated version of Dask’s DataFrame library. Some benchmarking shows performance speed-ups of 7x~50x. Not all DataFrame operations will have this same speed-up. For example, if you are operating row-by-row instead of in vectorized type operations, you may experience slower performance when using cuDF over Dask’s DataFrame library. cuDF supports most of the common data types you are likely to use, but not all.
Note
|
Under the hood, cuDF frequently delegates work to the cuPY library, but since it is created by NVIDIA employees and their focus is on supporting NVIDIA hardware, cuDF does not have direct support for ROCm. |
BlazingSQL uses GPU acceleration to provide super-fast SQL queries. BlazingSQL operates on top of cuDF.
Note
|
While BlazingSQL is a wonderful tool, much of its documentation is broken. For example, at the time of this writing, none of the examples linked in the main README resolve correctly, and the documentation site is entirely offline. |
Another GPU-accelerated library for streaming on GPUs is cuStreamz, which is basically a combination of Dask streaming and cuDF; we cover it more in [appD].
Allocating memory on GPUs tends to be slow, so many libraries hold on to these resources. In most situations, if the Python VM exits, the resources will be cleared up. An option of last resort is to bounce all of the workers using client.restart
. When possible, you will be best served by manually managing resources—which is library-dependent. For example, cuPY users can free the blocks once used by calling free_all_blocks()
, as per the memory management documentation.
CPU Fallback refers to attempting to use an accelerator, like GPU or TPU, and falling back to the regular CPU code path if the accelerator is unavailable. In most cases, this is a good design pattern to follow, as accelerators (like GPUs) can be expensive and may not always be available. However, in some cases, the difference between CPU and GPU performance is so large that falling back to the CPU is unlikely to be able to succeed in a practical amount of time; this occurs most often with deep learning algorithms.
Object-oriented programming and duck-typing are somewhat well suited to this design pattern, since, provided that two classes implement the same parts of the interface you are using, you can swap them around. However, much like swapping in Dask DataFrames for pandas DataFrames, it is imperfect, especially when it comes to performance.
Warning
|
In a better world, we could submit a task requesting GPU resources, and if that does not get scheduled, we could switch back to CPU-only resources. Unfortunately, Dask’s resources scheduling is closer to "best effort,"[3] so we may be scheduled on nodes without the resources we request. |
Specialized accelerators, like GPUs, can make large differences in your workflows. Picking the right accelerator for your workflow is important, and some workflows are not well suited to acceleration. Dask does not automate the usage of any accelerators, but there are various libraries that you can use for GPU computation. Many of these libraries were not created with the idea of shared computation in mind, so it’s important to be on the lookout for accidental resource leaks, especially since GPU resources tend to be more expensive.