So far you’ve seen the basics of how Dask is built as well as how Dask uses these building blocks to support data science with DataFrames. This chapter explores where Dask’s bag and array interfaces—often overlooked, relative to DataFrames—are more appropriate. As mentioned in [hello_worlds], Dask bags implement common functional APIs, and Dask arrays implement a subset of NumPy arrays.
Tip
|
Understanding partitioning is important for understanding collections. If you skipped [basic_partitioning], now is a good time to head back and take a look. |
Dask arrays implement a subset of the NumPy ndarray interface, making them ideal for porting code that uses NumPy to run on Dask. Much of your understanding from the previous chapter with DataFrames carries over to Dask arrays, as well as much of your understanding of ndarrays.
Some common use cases for Dask arrays include:
-
Large-scale imaging and astronomy data
-
Weather data
-
Multi-dimensional data
Similar to Dask DataFrames and pandas, if you wouldn’t use an nparray for the problem at a smaller scale, a Dask array may not be the right solution.
If your data fits in memory on a single computer, using Dask arrays is unlikely to give you many benefits over nparrays, especially compared to local accelerators like Numba. Numba is well suited to vectorizing and parallelizing local tasks with and without Graphics Processing Units (GPUs). You can use Numba with or without Dask, and we’ll look at how to further speed up Dask arrays using Numba in [ch10].
Dask arrays, like their local counterpart, require that data is all of the same type. This means that they cannot be used for semi-structured or mixed-type data (think strings and ints).
As with Dask DataFrames, loading and writing functions start with to_
or read_
as the prefixes. Each format has its own configuration, but in general, the first positional argument is the location of the data to be read. The location can be a wildcard path of files (e.g., s3://test-bucket/magic/*), a list of files, or a regular file location.
Dask arrays support reading the following formats:
-
zarr
-
npy stacks (only local disk)
And reading from and writing to:
-
hdf5
-
zarr
-
tiledb
-
npy stacks (local disk only)
In addition, you can convert Dask arrays to/from Dask bags and DataFrames (provided the types are compatible). As you may have noted, Dask does not support reading arrays from as many formats as you might expect, which provides the opportunity for an excellent use of bags (covered in the next section).
While Dask arrays implement a large amount of the ndarray APIs, it is not a complete set. As with Dask DataFrames, some of the omissions are intentional (e.g., sort
, much of linalg
, etc., which would be slow), and other parts are just missing because no one has had the time to implement them yet.
Since, as with distributed DataFrames, the partitioned nature of Dask arrays makes performance a bit different, there are some unique Dask array functions not found in numpy.linalg
:
map_overlap
-
You can use this for any windowed view of the data, as with
map_overlap
on Dask DataFrames. map_blocks
-
This is similar to Dask’s DataFrames
map_partitions
, and you can use it for implementing embarrassingly parallel operations that the standard Dask library has not yet implemented, including new element-wise functions in NumPy. topk
-
This returns the topk elements of the array in place of fully sorting it (which is much more expensive).[1]
compute_chunk_sizes
-
Dask needs to know the chunk sizes to support indexing; if an array has unknown chunk sizes, you can call this function.
These special functions are not present on the underlying regular collections, as they do not offer the same performance savings in non-parallel/non-distributed environments.
To continue to draw parallels to Python’s internal data structures, you can think of bags as slightly different lists or sets. Bags are like lists except without the concept of order (so there are no indexing operations). Alternatively, if you think of bags like sets, the difference between them is that bags allow duplicates. Dask’s bags have the least number of restrictions on what they contain and similarly have the smallest API. In fact, Examples #make_bag_of_crawler through #wc_func covered most of the core of the APIs in bag.
Tip
|
For users coming from Apache Spark, Dask bags are most closely related to Spark’s RDDs. |
Bags are an excellent choice when the structure of the data is unknown or otherwise not consistent. Some of the common use cases are as follows:
-
Grouping together a bunch of
dask.delayed
calls—for example, for loading "messy" or unstructured (or unsupported) data. -
"Cleaning" (or adding structure to) unstructured data (like JSON).
-
Parallelizing a group of tasks over a fixed range—for example, if you want to call an API 100 times but you are not picky about the details.
-
Catch-all: if the data doesn’t fit in any other collection type, bags are your friend.
We believe that the most common use case for Dask bags is loading messy data or data that Dask does not have built-in support for.
Dask bags have built-in readers for text files, with read_text
, and avro files, with read_avro
. Similarly, you can write Dask bags to text files and avro files, although the results must be serializable. Bags are commonly used when Dask’s built-in tools for reading data are not enough, so the next section will dive into how to go beyond these two built-in formats.
Normally, the goal when loading messy data is to get it into a structured format for further processing, or at least to extract the components that you are interested in. While your data formats will likely be a bit different, this section will look at loading some messy JSON and then extracting some relevant fields. Don’t worry—we call out places where different formats or sources may require different techniques.
For messy textual data, which is a common occurrence with JSON, you can save yourself some time by loading the data using bags' read_text
function. The read_text
function defaults to splitting up the records by line; however, many formats cannot be processed by line. To get each whole file in a whole record rather than it being split up, you can set the linedelimiter
parameter to one not found. Often REST APIs will return the results as a subcomponent, so in Pre-processing JSON, we load the US Food and Drug Administration (FDA) recall dataset and strip it down to the part we care about. The FDA recall dataset is a wonderful real-world example of nested datasets often found in JSON data, which can be hard to process directly in DataFrames.
link:./examples/dask/Dask-Collections.py[role=include]
If you need to load data from an unsupported source (like a custom storage system) or a binary format (like protocol buffers or Flexible Image Transport System), you’ll need to use lower-level APIs. For binary files that are still stored in an FSSPEC-supported filesystem like S3, you can try the pattern in Loading PDFs from an FSSPEC-supported filesystem.
link:./examples/dask/Dask-Collections.py[role=include]
If you are not using a FSSPEC-supported filesystem, you can still load the data as illustrated in Loading data using a purely custom function.
link:./examples/dask/Dask-Collections.py[role=include]
Note
|
Loading data in this fashion requires that each file be able to fit inside a worker/executor. If that is not the case, things get much more complicated. Implementing splittable data readers is beyond the scope of this book, but you can take a look at Dask’s internal IO libraries (text is the easiest) to get some inspiration. |
Sometimes with nested directory structures, creating the list of files can take a long time. In that case, it’s worthwhile to parallelize the listing of files as well. There are a number of different techniques to parallelize file listing, but for simplicity, we show parallel recursive listing in Listing the files in parallel (recursively).
link:./examples/dask/Dask-Collections.py[role=include]
Tip
|
You don’t always have to do the directory listing yourself. It can be worthwhile to check whether there is a metastore, such as Hive or Iceberg, which can give you the list of files without all of these slow API calls. |
This approach has some downsides: namely, all the filenames come back to a single point—but this is rarely an issue. However, if even just the list of your files is too big to fit in memory, you’ll want to try a recursive algorithm for directory discovery, followed by an iterative algorithm for file listing that keeps the names of the files in the bag.[2] The code becomes a bit more complex, as shown in Listing the files in parallel without collecting to the driver, so this last approach is rarely used.
link:./examples/dask/Dask-Collections.py[role=include]
A fully iterative algorithm with FSSPEC would not be any faster than the naive listing, since FSSPEC does not support querying just for directories.
Dask bags are not well suited to most reduction or shuffling operations, as their core reduction
function reduces results down to one partition, requiring that all of the data fit on a single machine. You can reasonably use aggregations that are purely constant space, such as mean, min, and max. However, most of the time you find yourself trying to aggregate your data, you should consider transforming your bag into a DataFrame with bag.Bag.to_dataframe
.
Tip
|
All three Dask data types (bag, array, and DataFrame) have methods for being converted to other data types. However, some conversions require special attention. For example, when converting a Dask DataFrame to a Dask array, the resulting array will have |
While Dask DataFrames get the most use, Dask arrays and bags have their place. You can use Dask arrays to speed up and parallelize large multi-dimensional array processes. Dask bags allow you to work with data that doesn’t fit nicely into a DataFrame, like PDFs or multi-dimensional nested data. These collections get much less focus and active development than Dask DataFrames but may still have their place in your workflows. In the next chapter, you will see how you can add state to your Dask programs, including with operations on Dask’s collections.