Now that you’ve run your first few tasks with Dask, it’s time to learn a little bit about what’s happening behind the scenes. Depending on whether you are using Dask locally or in a distributed fashion, the behavior can be a little different. While Dask does a good job of abstracting away many of the details of running on multiple threads or servers, having a solid grasp of how Dask is working will help you better decide both how and when to use it.
To be familiar with Dask, you need to understand:
-
The deployment framework that Dask is able to run on, and its strengths and weaknesses
-
The types of data that Dask is able to read, and how you can interact with the data types in Dask
-
The computational pattern of Dask, and how to turn your ideas into Dask code
-
How to monitor and troubleshoot
In this chapter, we will introduce each of these concepts, and we will expand upon them in the rest of the book.
Dask has many different execution backends, but we find it easiest to think about them in two groups, local and distributed. With local backends, you are limited in scale to what a single computer can handle. Local backends also have advantages like the avoidance of network overhead, simpler library management, and a lower dollar cost.[1] Dask’s distributed backend has many options for deployment, from cluster managers such as Kubernetes to job queue–like systems.
Dask’s three local backends are single-process, multi-threaded, and multi-process. The single-process backend has no parallelism and is mostly useful for validating that a problem is caused by concurrency. Multi-threaded and multi-process backends are ideal for problems in which the data is small or the cost of copying it would be higher than computation time.
Tip
|
If you don’t configure a specific local backend, Dask will pick the backend based on the library you are working with. |
The local multi-threaded scheduler is able to avoid having to serialize data and interprocess communication costs. The multi-threaded backend is suited for tasks in which the majority of the computation is happening in native code, outside of Python. This is the case for many numeric libraries, such as pandas and NumPy. If that is the case for you, you can configure Dask to use multi-threading, as shown in Configuring Dask to use multi-threading.
link:./examples/dask/Dask-Ch3-Concepts.py[role=include]
The local multi-process backend, shown in Configuring Dask to use the multi-process backend, has some additional overhead over multi-threaded, although it can be decreased on Unix and Unix-like systems.[2] The multi-process backend is able to avoid Python’s global interpreter lock by launching separate processes. Launching a new process is more expensive than a new thread, and Dask needs to serialize data that moves between processes.[3]
link:./examples/dask/Dask-Ch3-Concepts.py[role=include]
If you are running on a Unix system, you can use the forkserver, shown in Configuring Dask to use the multi-process forkserver, which will reduce the overhead of starting each Python interpreter. Using the forkserver will not reduce the communication overhead.
link:./examples/dask/Dask-Ch3-Concepts.py[role=include]
This optimization is generally not available on Windows.
Dask’s local backends are designed for performance rather than for testing that your code will work on a distributed scheduler. To test that your code will run remotely, you should use Dask’s distributed scheduler with a LocalCluster instead.
While Dask can work well locally, its true power comes with the distributed scheduler, with which you can scale your problem to multiple machines. Since there are physical and financial limits to how much computing power, storage, and memory can be put into one machine, using multiple machines is often the most cost-efficient solution (and it is sometimes the only solution). Distributed computing is not without its drawbacks; as Leslie Lamport famously said, "A distributed system is one in which the failure of a computer you didn’t even know existed can render your own computer unusable." While Dask does much to limit these failures (see [fault_tolerance_ch03_1688739701115]), you accept some increase in complexity when moving to a distributed system.
Dask has one distributed scheduler backend, and it talks to many different types of clusters, including a LocalCluster. Each type of cluster is supported in its own library, which schedules the scheduler[4] that the Dask client then connects to. Using the distributed abstraction dask.distributed
gives you portability between any types of cluster you may be using at the moment, including local. If you don’t use dask.distributed
, Dask can still run perfectly well on a local computer, in which case you are using a default single-machine scheduler provided in the Dask library.
The Dask client is your entry point into the Dask distributed scheduler. In this chapter, we will be using Dask with a Kubernetes cluster; if you have another type of cluster or want details, please see [ch12].
With auto-scaling, Dask can increase or decrease the computers/resources being used, based on the tasks you have asked it to run.[5] For example, if you have a program that computes complex aggregations using many computers but then mostly operates on the aggregated data, the number of computers you need could decrease by a large amount post-aggregation. Many workloads, including machine learning, do not need the same amount of resources/computers the entire time.
Some of Dask’s cluster backends, including Kubernetes, support auto-scaling, which Dask calls adaptive deployments. Auto-scaling is useful mostly in situations of shared cluster resources, or when running on cloud providers where the underlying resources are paid for by the hour.
Dask’s client is not fault tolerant, so while Dask can handle the failures of its workers, if the connection between the client and the scheduler is broken, your application will fail. A common workaround for this is scheduling the client within the same environment as the scheduler, although this does somewhat reduce the usefulness of having the client and scheduler as separate components.
Part of why Dask is so powerful is the Python ecosystem that it is in. While Dask will pickle, or serialize (see Serialization and Pickling), and send our code to the workers, this doesn’t include the libraries we use.[6] To take advantage of that ecosystem, you need to be able to use additional libraries. During the exploration phase, it is common to install packages at runtime as you discover that you need them.
The PipInstall
worker plug-in takes a list of packages and installs them at runtime on all of the workers. Looking back at [web_crawler_ch02_1688747981454], to install bs4 you would call distributed.diagnostics.plugin.PipInstall(["bs4"])
. Any new workers that are launched by Dask then need to wait for the package to be installed. The PipInstall
plug-in is ideal for quick prototyping when you are discovering which packages you need. You can think of PipInstall
as the replacement for !pip install
in a notebook over having a virtualenv.
To avoid the slowness of having to install packages each time a new worker is launched, you should try to pre-install your libraries. Each cluster manager (e.g., YARN, Kubernetes, Coiled, Saturn, etc.) has its own methods for managing dependencies. This can happen at runtime or at setup where the packages are pre-installed. The specifics for the different cluster managers are covered in [ch12].
With Kubernetes, for example, the default startup script checks for the presence of some key environment variables (EXTRA_APT_PACKAGES
, EXTRA_CONDA_PACKAGES
, and EXTRA_PIP_PACKAGES
), which, in conjunction with customized worker specs, can be used to add dependencies at runtime. Some of them, like Coiled and Kubernetes, allow for adding dependencies when building an image for our workers. Others, like YARN, use preallocated conda/virtual environment packing.
Warning
|
It is very important to have the same versions of Python and libraries installed on all of the workers and your client. Different versions of libraries can lead to outright failures or subtler data correctness issues. |
One of your first stops in understanding what your program is doing should be Dask’s Diagnostics UI. The UI allows you to see what Dask is executing, the number of worker threads/processes/computers, memory utilization information, and much more. If you are running Dask locally, you will likely find the UI at http://localhost:8787.
If you’re using the Dask client to connect to a cluster, the UI will be running on the scheduler node. You can get the link to the dashboard from client.dashboard_link
.
Tip
|
For remote notebook users, the hostname of the scheduler node may not be reachable directly from your computer. One option is to use the Jupyter proxy; for example, one might go to |
The Dask UI (digital, color version) shows the Dask UI during the running of the examples in this chapter.
The UI allows you to see what Dask is doing and what’s being stored on the workers and to explore the execution graph. We will revisit the execution graph in visualize.
Distributed and parallel systems depend on serialization, sometimes called pickling in Python, to share data and functions/code between processes. Dask uses a mixture of serialization techniques to match the use case and provides hooks to extend by class when the defaults don’t meet your needs.
Warning
|
We most often think of serialization when it fails (with an error), but equally important can be situations where we end up serializing more data than we need—or when the amount of data that needs to be transferred is so large that distributing the work is no longer beneficial. |
Cloudpickle serializes the functions and the generic Python types in Dask. Most Python code doesn’t depend on serializing functions, but cluster computing often does. Cloudpickle is a project designed for cluster computing and is able to serialize and deserialize more functions than Python’s built-in pickle.
Warning
|
Dask has its own ability to extend serialization, but the registry methods are not automatically sent to the workers, and it’s not always used.[7] |
Dask has built-in special handling for NumPy arrays, sparse, and cuPY. These serializations tend to be more space efficient than the default serializers. When you make a class that contains one of these types and does not require any special initialization, you should call register_generic(YourClass)
from dask.distributed.protocol
to take advantage of Dask’s special handling.
If you have a class that is not serializable, as in Dask fails to serialize, you can wrap it to add serialization functions to it, as shown in Custom serialization.
link:./examples/dask/Dask-Ch3-Concepts.py[role=include]
link:./examples/dask/Dask-Ch3-Concepts.py[role=include]
If you control the original class, you can also directly add the getstate
/setstate
methods instead of wrapping it.
Note
|
Dask automatically attempts to compress serialized data, which generally improves performance. You can disable this by setting |
Partitioning gives you the ability to control the number of tasks used to process your data. If you have billions of rows, using one task for each row would mean you spend more time scheduling the tasks than doing the work itself. Understanding partitioning is key to being able to make the most efficient use of Dask.
Dask uses slightly different terminology for partitioning in each of its collections. In Dask, partitioning impacts how data is located on your cluster, and the right partitioning for your problem can make order-of-magnitude improvements. Partitioning has a few different aspects, like the size of each partition, the number of partitions, and optional properties such as partition key and sorted versus unsorted.
The number and size of partitions are closely related and impact the maximum parallelism. Partitions that are too small or too great in number mean that Dask will spend more time scheduling the tasks than running them. A general sweet spot for partition size is around 100 MB to 1 GB, but if your computation per element is very expensive, smaller partition sizes can perform better.
Ideally, your partitions should be similar in size to avoid stragglers. A situation in which you’ve got partitions of different sizes is called skewed. There are many different sources of skew, ranging from input file sizes to key skew (when keyed). When your data gets too skewed, you will need to repartition the data.
Tip
|
The Dask UI is a great place to see if you might have stragglers. |
Dask arrays' partitions are called chunks and represent the number of elements. Although Dask always knows the number of chunks, when you apply a filter or load data, Dask is unaware of the size of each chunk. Indexing or slicing a Dask array requires that Dask know the chunk sizes so it can find the chunk(s) with the desired elements. Depending on how your Dask array was created, Dask may or may not know the size of each chunk. We talk about this more in [ch05]. If you want to index into an array where Dask does not know the chunk sizes, you will need to first call compute_chunk_sizes()
on the array. When creating a Dask array from a local collection, you can specify the target chunk size, as shown in Custom array chunk size.
link:./examples/dask/Dask-Ch3-Concepts.py[role=include]
Partitions/chunking doesn’t have to be static, and the rechunk
function allows you to change the chunk size of a Dask array.
Dask bags' partitions are called partitions. Unlike with Dask arrays, since Dask bags do not support indexing, Dask does not track the number of elements in each partition. When you use scatter
, Dask will try to partition the data as well as possible, but subsequent iterations can change the number of elements inside each partition. Similar to Dask arrays, when creating from a local collection, you can specify the number of partitions of a bag, except the parameter is called npartitions
instead of chunks
.
You can change the number of partitions in a bag by calling repartition
with either npartitions
(for a fixed number of partitions) or partition_size
(for a target size of each partition). Specifying partition_size
is more expensive since Dask needs to do some extra computation to determine what the matching number of partitions would be.
You can think of data as keyed when there is an index or when the data can be looked up by a value, such as in a hashtable. While bag implements keyed operations like groupBy
, where values with the same key are combined, its partitioning does not have any idea of key and instead keyed operations always operate on all partitions.[8]
DataFrames have the most options for partitioning. DataFrames can have partitions of different sizes, as well as known or unknown partitioning. With unknown partitioning, the data is distributed, but Dask is unable to determine which partition holds a particular key. Unknown partitioning happens often, as any operation that could change the value of a key results in unknown partitioning. The known_divisions
property on a DataFrame allows you to see whether Dask knows the partitioning, and the index
property shows the splits used and the column.
If a DataFrame has the right partitioning, operations like groupBy
, which would normally involve a lot of internode communication, can be executed with less communication. Accessing rows by ID requires that the DataFrame is partitioned on that key. If you want to change the column that your DataFrame is partitioned on, you can call set_index
to change the index. Setting the index, like all the repartitioning operations, involves copying the data between workers, known as a shuffle.
Tip
|
The "right" partitioner for a dataset depends on not only the data but also your operations. |
Shuffling refers to transferring data between different workers to repartition the data. Shuffling can be the result of an explicit operation, like calling repartition
, or an implicit one, like grouping data together by key or performing an aggregation. Shuffles tend to be relatively expensive, so it’s useful to minimize how often they are needed and also to reduce the amount of data they move.
The most straightforward case to understand shuffles is when you explicitly ask Dask to repartition your data. In those cases you generally see many-to-many worker communication, with the majority of the data needing to be moved over the network. This is naturally more expensive than situations in which data is able to be processed locally, as the network is much slower than RAM.
Another important way that you can trigger shuffles is implicitly through a reduction/aggregation. In such cases, if parts of the reduction or aggregation can be applied prior to moving the data around, Dask is able to transfer less data over the network, making for a faster shuffle.
Tip
|
Sometimes you’ll see things referred to as map-side and reduce-side; this just means before and after the shuffle. |
We’ll explore more about how to minimize the impact of shuffles in the next two chapters, where we introduce aggregations.
So far you’ve seen how to control partitions when creating from a local collection, as well as how to change the partitioning of an existing distributed collection. Partitioning during the creation of a collection from delayed tasks is generally 1:1, with each delayed task being its own partition. When loading data from files, partitioning becomes a bit more complicated, involving file layout and compression. Generally speaking, it is good practice to look at the partitioning of data you have loaded by calling npartitions
for bags, chunks
for arrays, or index
for DataFrames.
Tasks are the building blocks that Dask uses to implement dask.delayed
, futures, and operations on Dask’s collections. Each task represents a small piece of computation that Dask cannot break down any further. Tasks are often fine-grained, and when computing a result Dask will try to combine multiple tasks into a single execution.
Most of Dask is lazily evaluated, with the exception of Dask futures. Lazy evaluation shifts the responsibility for combining computations from you to the scheduler. This means that Dask will, when it makes sense, combine multiple function calls. Not only that, but if only some parts of a structure are needed, Dask is sometimes able to optimize by evaluating just the relevant parts (like head
or tail
calls).[9] Implementing lazy evaluation requires Dask to construct a task graph. This task graph is also reused for fault tolerance.
Unlike most of Dask, futures are eagerly evaluated, which limits the optimizations available when chaining them together, as the scheduler has a less complete view of the world when it starts executing the first future. Futures still create task graphs, and you can verify this by visualizing them in the next section.
Unlike the rest of Dask, futures are eagerly evaluated, which limits the optimizations available when chaining them together, as the scheduler has a less complete view of the world when it starts executing the first future. Futures still create task graphs, and you can verify this by visualizing them, as we’ll see in the next section.
In addition to nested tasks, as seen in [nested_tasks], you can also use a dask.delayed
object as input to another delayed computation (see Task dependencies), and Dask’s submit
/compute
function will construct a task graph for you.
link:./examples/dask/Dask-Ch3-Concepts.py[role=include]
Now when you go to compute the final combined value, Dask will compute all of the other values that are needed for the final function using its implicit task graph.
Note
|
You don’t need to pass around real values. For example, if one function updates a database and you want to run another function after that, you can use it as a parameter even if you don’t actually need its Python return value. |
By passing delayed objects into other delayed function calls, you allow Dask to re-use shared nodes in the task graph and potentially reduce network overhead.
Visualizing the task graph is an excellent tool for you to use while learning about task graphs and debugging in the future. The visualize
function is defined both in the Dask library and on all Dask objects. Instead of calling .visualize
separately on multiple objects, you should call dask.visualize
with the list of objects you are planning to compute to see how Dask combines the task graph.
You should try this out now by visualizing everyone’s favorite word count example from the previous chapter. When you call dask.visualize
on words_bag.frequencies()
, you should get a result that looks something like Visualized word count task graph.
Tip
|
The Dask UI also shows visualized representations of the task graph, without the need to modify your code. |
Intermediate task results are generally removed as soon as the dependent task has started to execute. This can be less than optimal when we need to perform multiple computations on the same piece of data. One solution to this is combining all our execution together into one call to dask.compute
, so that Dask is able to keep the data around as needed. This breaks down in both the interactive case, where we don’t know in advance what our computation is going to be, and iterative cases. In those cases, some form of caching or persistence can be beneficial. You will learn about how to apply caching later in this chapter.
Dask uses a centralized scheduler, which is a common technique for many systems. It does mean, however, that while the general task scheduling overhead is 1 ms, as the number of tasks in a system increases the scheduler can become a bottleneck, and the overhead can grow. Counterintuitively, this means that as our system grows we may benefit from larger, coarser-grained tasks.
Sometimes the task graph itself can become too much for Dask to handle. This issue can show up as an out-of-memory exception on the client or scheduler or, more commonly, as jobs that slow down with iterations. Most frequently this occurs with recursive algorithms. One common example of a situation in which the graph can become too expensive to keep is distributed alternating least squares.
The first step when encountering a situation with a too-large task graph is to see if you can reduce the parallelism by using larger chunks of work or by switching the algorithm. For example, if we think of Fibonacci numbers computed with recursion, a better option would be using a dynamic programming or memoized solution instead of trying to distribute the computation task with Dask.
If you have an iterative algorithm, and there isn’t a better way to accomplish what you want, you can help Dask out by periodically writing out the intermediate work and re-loading it.[10] By doing this, Dask does not have to keep track of all of the steps involved in creating the data, but instead just needs to remember where the data is. The next two chapters will look at how to write and load data efficiently for these and other purposes.
Tip
|
In Spark the equivalent idea is expressed as checkpointing. |
To take the most advantage of Dask’s graph optimization, it’s important to submit your work in larger batches. First off, when you’re blocking on the result with dask.compute
, small batches can limit the parallelism. If you have a shared parent—say, two results on the same data—submitting the computations together allows Dask to share the computation of the underlying data. You can verify whether Dask is able to share a common node by calling visualize
on your list of tasks (e.g., if you take Examples #wc_freq and #wc_func and visualize both the tasks together, you’ll see the shared node in Visualized word count task graph).
Sometimes you can’t submit the computations together, but you still know that you want to reuse some data. In those cases you should explore persistence.
Persistence allows you to keep specified Dask collections in memory on the cluster. To persist a collection for future reuse, just call dask.persist
on the collection. If you choose persistence, you will be responsible for telling Dask when you are done with a distributed collection. Unlike Spark, Dask does not have an easy unpersist
equivalent; instead, you need to release the underlying futures for each partition as shown in Manual persistence and memory management with Dask.
link:./examples/dask/Dask-Ch3-Concepts.py[role=include]
Warning
|
A common mistake is to persist and cache things that are used only once or are inexpensive to compute. |
Dask’s local mode has a best-effort caching system based on cachey. Since this only works in local mode, we won’t go into the details, but if you are running in local mode, you can take a look at the local cache documentation.
Warning
|
Dask does not raise an error when you attempt to use Dask caching in a distributed fashion; it just won’t work. So when migrating code from local to distributed, make sure to check for usage of Dask’s local caching. |
In distributed systems like Dask, fault tolerance generally refers to how a system handles computer, network, or program failures. Fault tolerance becomes increasingly important the more computers you use. When you are using Dask on a single computer, the concept of fault tolerance is less important, since if your one computer fails, there is nothing to recover. However, when you have hundreds of machines, the odds of a machine failing go up. Dask’s task graph is used to provide its fault tolerance.[11] There are many different kinds of failures in a distributed system, but thankfully many of them can be handled in the same way.
Dask automatically retries tasks when the scheduler loses connection to the worker. This retry is accomplished by using the same graph of computation Dask uses for lazy evaluation.
Warning
|
The Dask client is not fault tolerant to network issues connecting to the scheduler. One mitigation technique you can use is to run your client in the same network as the scheduler. |
Machine failure is a fact of life with distributed systems. When a worker fails, Dask will treat it in the same way as a network failure, retrying any necessary tasks. However, Dask cannot recover from failures of the scheduler of your client code.[12] This makes it important that, when you are running in a shared environment, you run your client and scheduler nodes at a high priority to avoid preemption.
Dask automatically retries software failures that exit or crash the worker. Much like machine failure, from Dask’s point of view a worker exiting and a network failing look the same.
IOError and OSError exceptions are the only two classes of exceptions Dask will retry. If your worker process raises one of these errors, they are pickled and transferred over to the scheduler. Dask’s scheduler then retries the task. If your code encounters an IOError that should not be retried (e.g., a web page doesn’t exist), you’ll need to wrap it in another exception to keep Dask from retrying it.
Since Dask retries failed computation, it’s important to be careful with side effects or changing values. For example, if you have a Dask bag of transactions and were to update a database as part of a map
, Dask might re-execute some of the operations on that bag multiple times, resulting in the update to the database happening more than once. If we think of a withdrawal from an ATM, we can see how this would result in some unhappy customers and incorrect data. Instead, if you need to mutate small bits of data, you can bring them back to a local collection.
If your program encounters any other exceptions, Dask will return the exception to your main thread.[13]
After this chapter you should have a good grasp of how Dask is able to scale your Python code. You should now understand the basics of partitioning, why this matters, task sizing, and Dask’s approach to fault tolerance. This will hopefully set you up well for deciding when to apply Dask, and for the next few chapters, where we do a deeper dive into Dask’s collection libraries. In the next chapter we’ll focus on Dask’s DataFrames, as they are the most full-featured of Dask’s distributed collections.
groupBy
.
head
on is a great candidate for Dask to optimize, but for a single task making a large DataFrame, Dask is unable to break "inside."