We are so happy that you’ve decided to explore whether Dask is the system for you by trying it out. In this chapter, we will focus on getting started with Dask in its local mode. Using this, we’ll explore a few more straightforward parallel computing tasks (including everyone’s favorite, word count).[1]
Installing Dask locally is reasonably straightforward. If you want to begin running on multiple machines, doing so is often easier when you start with a conda environment (or virtualenv). This lets you figure out what packages you depend on by running pip freeze
to make sure they’re on all of the workers when it’s time to scale.
While you can just run pip install -U dask
, we prefer using a conda environment since it’s easier to match the version of Python to that on a cluster, which allows us to connect a local machine to the cluster directly.[2] If you don’t already have conda on your machine, Miniforge is a good and quick way to get conda installed across multiple platforms. The installation of Dask into a new conda environment is shown in Installing Dask into a new conda environment.
link:./examples/dask/setup-dask-user.sh[role=include]
Here we install a specific version of Dask rather than just the latest version. If you’re planning to connect to a cluster later on, it will be useful to pick the same version of Dask as is installed on the cluster.
Note
|
You don’t have to install Dask locally. There is a BinderHub example with Dask and distributed options, including one from the creators of Dask, that you can use to run Dask, as well as other providers such as SaturnCloud. That being said, we recommend having Dask installed locally even if you end up using one of these services. |
Another way to get Dask running locally is to use example Docker images maintained by the Dask project. The benefit of this approach is that the same image can then be used in a distributed cluster, each node running the same Docker image locally, thus ensuring the compatibility of all the packages. Advanced users can use the Dask example Docker images as a base and add packages of their choice before committing changes and saving it as a new image.
Now that you have Dask installed locally, it’s time to try the versions of "Hello World" available through its various APIs. There are many different options for starting Dask. For now, you should use LocalCluster, as shown in Using LocalCluster to start Dask.
link:./examples/dask/Dask-Ch2-Hello-Worlds.py[role=include]
One of the core building blocks of Dask is dask.delayed
, which allows you to run functions in parallel.
If you are running Dask on multiple machines, these functions can also be distributed (or spread out) on the different machines.
When you wrap a function with dask.delayed
and call it, you get back a "delayed" object representing the desired computation.
When you created a delayed object, Dask is just making a note of what you might want it to do. As with a lazy teenager, you need to be explicit. You can force Dask to start computing the value with dask.submit
, which produces a "future."
You can use dask.compute
both to start computing the delayed objects and futures and to return their values.[3]
An easy way to see the performance difference is by writing an intentionally slow function, like slow_task
, which calls sleep
. Then you can compare the performance of Dask to "regular" Python by mapping the function over a few elements with and without dask.delayed
, as shown in Sleepy task.
link:./examples/dask/Dask-Ch2-Hello-Worlds.py[role=include]
When we run this example, we get In sequence 20.01662155520171, in parallel 6.259156636893749
, which shows that Dask can run some of the tasks in parallel, but not all of them.[4]
One of the neat things about dask.delayed
is that you can launch tasks inside of other tasks.[5] A straightforward real-world example of this is a web crawler, with which, when you visit a web page, you want to fetch all of the links from that page, as shown in Web crawler.
link:./examples/dask/Dask-Ch2-Hello-Worlds.py[role=include]
Note
|
In practice, some central co-ordination is still involved behind the scenes (including the scheduler), but the freedom to write your code in this nested way is quite powerful. |
We cover other kinds of task dependencies in [task_deps].
In addition to the low-level task APIs, Dask also has distributed collections. These collections enable you to work with data that would be too large to fit on a single machine and to naturally distribute work on it, which is called data parallelism. Dask has both an unordered collection called a bag, and an ordered collection called an array. Dask arrays aim to implement some of the ndarray interface, whereas bags focus more on functional programming (e.g., things like map
and filter
). You can load Dask collections from files, take local collections and distribute them, or take the results of dask.delayed
tasks and turn them into a collection.
In distributed collections, Dask splits the data up using partitions. Partitions are used to decrease the scheduling cost compared to operating on individual rows, which is covered in more detail in [basic_partitioning].
Dask arrays allow you to go beyond what can fit in memory, or even on disk, on a single computer. Many of the standard NumPy operations are supported out of the box, including aggregates such as average and standard deviation. The from_array
function in Dask arrays converts a local array-like collection into a distributed collection. Creating a distributed array and computing the average shows how to create a distributed array from a local one and then compute the average.
link:./examples/dask/Dask-Ch2-Hello-Worlds.py[role=include]
As with all distributed collections, what is expensive on a Dask array is not the same as what is expensive on a local array. In the next chapter you’ll learn a bit more about how Dask arrays are implemented and hopefully gain a better intuition around their performance.
Creating a distributed collection from a local collection uses the two fundamental building blocks of distributed computing, called the scatter-gather pattern. While the originating dataset must be from a local computer, fitting into a single machine, this already expands the number of processors you have at your disposal, as well as the intermediate memory you can utilize, enabling you to better exploit modern cloud infrastructure and scale. A practical use case would be a distributed web crawler, where the list of seed URLs to crawl might be a small dataset, but the memory you need to hold while crawling might be an order of magnitude larger, requiring distributed computing.
Dask bags implement more of the functional programming interfaces than Dask arrays. The "Hello World" of big data is word count, which is easier to implement with functional programming interfaces. Since you’ve already made a crawler function, you can turn its output into a Dask bag using the from_delayed
function in Turning the crawler function’s output into a Dask bag.
link:./examples/dask/Dask-Ch2-Hello-Worlds.py[role=include]
Now that you have a Dask bag collection, you can build everyone’s favorite word count example on top of it. The first step is to turn your bag of text into a bag of words, which you do by using map
(see Turning a bag of text into a bag of words). Once you have the bag of words, you can either use Dask’s built-in frequency
method (see Using Dask’s built-in frequency
method) or write your own frequency
method using functional transformations (see Using functional transformations to write a custom frequency
method).
link:./examples/dask/Dask-Ch2-Hello-Worlds.py[role=include]
frequency
methodlink:./examples/dask/Dask-Ch2-Hello-Worlds.py[role=include]
frequency
methodlink:./examples/dask/Dask-Ch2-Hello-Worlds.py[role=include]
On Dask bags, foldby
, frequency
, and many other reductions return a single partition bag, meaning the data after reduction needs to fit in a single computer. Dask DataFrames handle reductions differently and don’t have that same restriction.
Pandas is one of the most popular Python data libraries, and Dask has a DataFrame library that implements much of the pandas API. Thanks to Python’s duck-typing, you can often use Dask’s distributed DataFrame library in place of pandas. Not all of the API will work exactly the same, and some parts are not implemented, so be sure you have good test coverage.
Warning
|
Your intuition around what’s slow and fast with pandas does not carry over. We will explore this more in [dask_df]. |
To illustrate how you can use Dask DataFrame, we’ll rework Examples #make_bag_of_crawler through #wc_freq to use it. As with Dask’s other collections, you can create DataFrames from local collections, futures, or distributed files. Since you’ve already made a crawler function, you can turn its output into a Dask bag using the from_delayed
function from Turning the crawler function’s output into a Dask bag. Instead of using map
and foldby
, you can use pandas APIs such as explode
and value_counts
, as shown in DataFrame word count.
link:./examples/dask/Dask-Ch2-Hello-Worlds.py[role=include]
In this chapter you got Dask working on your local machine, as well as had a tour of the different "Hello World" (or getting started) examples with most of Dask’s different built-in libraries. Subsequent chapters will dive into these different tools in more detail.
Now that you’ve got Dask working on your local machine, you might want to jump on over to [ch12] and look at the different deployment mechanisms. For the most part, you can run the examples in local mode, albeit sometimes a little slower or at a smaller scale. However, the next chapter will look at the core concepts of Dask, and one of the upcoming examples emphasizes the benefits of having Dask running on multiple machines and is also generally easier to explore on a cluster. If you don’t have a cluster available, you may wish to set up a simulated one using something like MicroK8s.