Despite, or perhaps because of, the data ecosystems' rapid advances, you will likely end up needing to use multiple tools as part of your data pipeline. Ray Datasets allow data sharing between tools in the data and ML ecosystems. This allows you to switch tools without having to manage copying data. Ray Datasets support Spark, Modin, Dask, and Mars and can also be used with ML tools like Tensorflow. You can also use Apache Arrow with Ray to allow more tools to work on top of Datasets, like R or even MATLAB. Ray Datasets act as a common format for all steps of your machine learning pipeline, simplifying legacy pipelines.
All this boils down to: you can use the same dataset in multiple tools without worrying about the details. Internally many of these tools have their own formats but Ray and Apache Arrow manage the translations transparently.
In addition to simplifying your usage of different tools, Ray also has a growing collection of built-in operations for Datasets. These built-in operations are being actively developed and are not intended to be as full-featured as those of the data tools built on top of Ray.
Tip
|
As covered in [ray_objects], Ray Datasets default behaviour may be different than you expect. You can enable object recovery by setting |
Ray Datasets continue to be an area of active development, including large feature additions between minor releases, and there will likely be more functionality added by the time you are reading this chapter. Regardless, the fundamental principles of partitioning and multi-tool interoperability will remain the same.
As you saw in [ds_hello], you can create Datasets from local collections by calling ray.data.from_items
. However local collections naturally limit the scope of data that you can handle, so Ray supports many other options.
Apache Arrow defines a language-independent columnar memory format for flat and hierarchical data. The key components of Arrow include:
-
Rich data type sets covering both SQL and JSON types, such as int, BigInt, decimal, varchar, map, struct, and array.
-
Uses columnar in-memory representations allowing to support an arbitrarily complex record structure built on top of the defined data types.
-
Support for data structures including pick-lists[1], hash tables and queues.
-
Usage of shared memory, TCP/IP and RDMA for interprocess data exchange.
-
Data libraries used for reading and writing columnar data in multiple languages, including Java, C++, Python, Ruby, Rust, Go and JavaScript.
-
Algorithms for various operations including bitmap selection, hashing, filtering, bucketing, sorting and matching.
-
Increase memory usage efficiency through columnar in-memory compression.
-
Memory persistence tools for short-term persistence through non-volatile memory, SSD or HDD.
Ray uses Apache Arrow to load external data into Datasets, which supports a number of different file formats and file systems. The formats, at present, are csv, json, parquet, numpy, text, and raw binary. The functions for loading data follow the read_[format]
pattern and are in the ray.data
module.
link:examples/ray_examples/data/Ray-DataSets.py[role=include]
When loading you can specify a target parallelism
, but Ray may be limited by the number of files being loaded. Picking a good value for your target parallelism is complicated, and depends on a number of factors. You want to ensure that your data can fit easily in memory and take advantage of all of the machines in your cluster, while also not picking a number so high that the overhead of launching individual tasks exceeds the benefits. Generally, parallelism resulting in splits between 100s of MBs to 10s of gbs is often considered a sweet spot.
Tip
|
If you wish to customize how Arrow loads your data, you can pass additional arguments, like |
Arrow has built-in native/"fast" support for s3, HDFS, and "regular" filesystems. Ray automatically selects the correct built-in filesystem driver based on the path.
Warning
|
When loading from a local filesystem it is up to you to ensure the file is available on all of the workers when running Ray-distributed. |
Arrow, and by extension Ray, also uses fsspec which supports a wider array of filesystems, including https (when aiohttp is installed). Unlike with the "built-in" file systems, you need to manually specify the filesystem, as shown in Load Data Over HTTPS.
link:examples/ray_examples/data/Ray-DataSets.py[role=include]
Warning
|
At present, the protocol is incorrectly stripped off, so you need to put it in twice, for example when loading data from an https website you would load from |
Ray has the ability to write in all of the formats it can read from. The writing functions, like the read functions, follow a pattern of write_[format]
. There are a few minor differences between the read path and the write path. Instead of taking in a parallelism
parameter, the write path always writes with the parallelism of the input DataSet:
link:examples/ray_examples/data/Ray-DataSets.py[role=include]
If Ray does not have I/O support for your desired format or filesystem, you should check and see if any of the other tools that Ray supports does. Then, as covered in the next section, you can convert your DataSet from/to the desired tool.
Ray has built-in tooling to share data between the different data tools running on Ray. Most of these tools have their own internal representations of the data, but Ray handles converting the data as needed. Before you first use a Dataset with Spark or Dask you need to run a bit of "setup" code so that they delegate their execution to Ray, as shown in Setup Dask on Ray and Setup Dask on Spark.
link:examples/ray_examples/data/Ray-DataSets.py[role=include]
link:examples/ray_examples/data/Ray-DataSets.py[role=include]
As with functions to read/load datasets, transfer-to-ray functions are defined on the ray.data
module and follow the from_[x]
pattern where [x] is the tool name. Similar to writing data, converting Datasets to a tool with a to_[x]
function defined on the Dataset, where [x] is the tool name.
link:examples/ray_examples/data/Ray-DataSets.py[role=include]
Note
|
Datasets do not use Ray’s runtime environments for dependencies, so you must have your desired tools installed in your worker image, see [appB]. This is more involved for Spark as it requires the JVM and other non-Python components. |
You are not limited to the tools that are built into Ray. If you have a new tool that supports Apache Arrow, and you are using Apache Arrow supported types, to_arrow_refs()
gives you a zero copy Arrow representation of your dataset. You can then use this list of Ray Arrow objects to pass into your tool, be it model training or any other purpose. You will learn more about this in the "Built-in Ray DataSet operations" section.
Many tools and languages can be connected with Arrow and Ray, including the following:
Tip
|
Dask and Spark both have non-DataFrame collections (Bags, Arrays, and RDDs) which can not be converted with these APIs. |
This section assumes you have a good understanding of the data wrangling tools you’re going to use with ray, either pandas or Apache Spark. Pandas is ideal for users scaling Python analytics, and Spark is well suited for users connecting to big data tools. If you’re not familiar with the Pandas APIs, you should check out Python for Data Analysis: Data Wrangling with Pandas, NumPy, and IPython by Wes McKinney. New Spark users should check out Learning Spark, and if you want to go super deep Holden recommends High Performance Spark by Holden and Rachel[2].
Partitioning gives you the ability to control the number of tasks used to process your data. If you have billions of rows, using a single task, to process it can take forever. On another hand, using one task for each row would spend more time scheduling the tasks than the work itself. As a result, proper data partitioning is one of the fundamental requirements for its efficient processing. Usage of Ray Datasets allows you to efficiently split your data into partitions or chunks so that you can parallelize computation while keeping the overhead of scheduling a task at the acceptable level. As the number of partitions increases the maximum parallelism increases but so does the scheduling and communication overhead.
Dask on Ray is an excellent choice for data preparation for machine learning, or scaling existing pandas code. Many initial Dask developers also worked on Pandas leading to a comparatively solid distributed pandas interface.
Note
|
Portions of this section are based on the DataFrame chapter in Scaling Python with Dask (O’Reilly). |
Dask is a parallel computing library that scales the existing Python ecosystem. You can think of Dask at a high and a low level:
-
High level collections: Dask provides high-level Array, Bag, and DataFrame collections that mimic NumPy, lists, and Pandas but can operate in parallel on datasets that don’t fit into memory. Dask’s high-level collections are alternatives to NumPy and Pandas for large datasets.
-
Low Level schedulers: Dask provides dynamic task schedulers that execute task graphs in parallel. These execution engines power the high-level collections above but can also power custom, user-defined workloads.
Here we focus on Dask’s high level DataFrame collection as Ray has its own techniques for low-level scheduling.
Dask on Ray benefits from using Ray’s per-node/container shared memory storage for data. This is especially important when doing operations like broadcast joins, in Dask the same data will need to be stored in each worker process.[3] However, in Ray, it only needs to be stored once per-node/container.
Warning
|
Unlike Ray, Dask is generally lazy, meaning it does not evaluate data until forced. This can make debugging a challenge as errors may appear several lines removed from their root cause. |
Most of the distributed components of Dask’s DataFrames use the three core building blocks map_partitions
, reduction
, and rolling
. You mostly won’t need to call these functions directly, instead you will use higher level APIs, but understanding them and how they work is important to understanding how Dask works. shuffle
is a critical building block of distributed DataFrames for re-organizing your data. Unlike the other building blocks, you may use it directly more frequently as Dask is unable to abstract away partitioning.
Indexing into a DataFrame is one of the powerful features of pandas, but comes with some restrictions when moving into a distributed system like Dask. Since Dask does not, by default, track the size of each partition, positional indexing by row is not supported. You can use positional indexing into columns, as well as label indexing for columns or rows. Indexing is frequently used to filter the data to only have the components you need. We did this for the San Francisco Covid data by looking at just the case rates for people of all vaccine statuses, as shown in Dask DataFrame Indexing.
link:examples/dask/Dask-Ch4-DataFrames.py[role=include]
If you truly need positional indexing by row, you can implement your own by computing the size of each partition and using this to select the desired partition subsets. This is very inefficient, so Dask avoids implementing directly so you make an intentional choice before doing this.
As mentioned in the previous chapter, shuffles are expensive. The primary causes of the expensive nature of shuffles are the comparative slowness of network speed to reading data from memory and serialization overhead. These costs scale as the amount of data being shuffled increases, so Dask has some techniques to reduce the amount of data being shuffled. These techniques depend on certain data properties or on the operation being performed.
Note
|
While understanding shuffles is important for performance, feel free to skip this section if your code is working well enough. |
One situation which can trigger the need for a shuffle is a rolling window, where at the edges of a partition your function needs some records from its neighbors. Dask Dataframe has a special map_overlap
function where you can specify a "look-after"[4] and "look-before"[5] window of rows to transfer (either an integer or a time delta). The simplest example taking advantage of this is a rolling average.
link:examples/dask/Dask-Ch4-DataFrames.py[role=include]
This allows Dask to transfer only the data needed. For this implementation to work correctly your minimum partition size will need to be larger than your largest window.
Warning
|
Dask’s Rolling Windows will not cross multiple partitions. If your DataFrame is partitioned in such a way that the look-after or look-back is greater than the length of the neighbor’s partition, the results will either fail or be incorrect. Dask validates this for timedelta look-after, but no such checks are performed for look-backs or integer look-after. |
Aggregations are another special case that can reduce the amount of data that needs to be transferred over the network. Aggregations are anything where you combine records. If you are coming from a map/reduce or Spark background, reduceByKey, is the classic aggregation. Aggregations can either be "by key" or global across an entire DataFrame.
To aggregate by key, you first need to call groupby
with the column[s] representing the key, or keying function to aggregate on. For example, calling df.groupby("PostCode")
groups your DataFrame on postal code or calling df.groupby(["PostCode", "SicCodes"])
uses a combination of columns for grouping. Function-wise, many of the same pandas aggregates are available, but the performance of aggregates in Dask are very different than with local pandas DataFrames.
Tip
|
If you’re aggregating by partition key, Dask can compute the aggregation without needing a shuffle. |
The first way to speed up your aggregations is to reduce the columns that you are aggregating on, since the fastest data to process is no data. Finally, when possible, doing multiple aggregations at the same time reduces the number of times the same data needs to be shuffled, meaning if you need to compute the average and the max you should compute both at the same time.
link:examples/dask/Dask-Ch4-DataFrames.py[role=include]
For distributed systems like Dask, if an aggregation can be partially evaluated and then merged you can potentially combine some records pre-shuffle. Not all partial aggregations are created equal. What matters with partial aggregations is how much data is reduced when merging values with the same key.
The most efficient aggregations take a sub-linear amount of space regardless of the number of records. Some of these can take constant space such as sum, count, first, minimum, maximum, mean, and standard deviation. For more complicated tasks, like quantiles and distinct counts, there are also sub-linear approximation options. These approximation options can be great, as exact answers can require linear growth in storage.
Some aggregation functions are not sublinear in growth, but "tend to" or "might" not grow too quickly. Counting the distinct values is in this group, but if all your values are unique then there is no space-saving.
To take advantage of efficient aggregations, you need to use a built-in aggregation from Dask, or write your own using Dask’s aggregation class. Whenever you can, use a built-in. Built-ins not only require less effort but also are often faster. Not all of the pandas' aggregates are directly supported in Dask, so sometimes your only choice is to write your own aggregate.
If you chose to write your own aggregate, you have three functions to define: chunk
for handling each group-partition/chunk, agg
to combine the results of chunk
between partitions, and (optionally) finalize
to take the result of agg
and produce a final value.
The fastest way to understand how to use partial aggregation is by looking at an example that uses all three functions. Using weighted average as an example, with Dask Custom Aggregate, can help you think of what is needed for each function. The first function needs to compute the weighted values and the weights. The agg function combines these by summing each side part of the tuple. Finally, the finalize function divides the total by the weights.
link:examples/dask/Dask-Ch4-DataFrames.py[role=include]
In some cases, like a pure summation, you don’t need to do any post-processing on agg’s output; so you can skip the finalize function.
Not all aggregations must be "byKey"; you can also compute aggregations across all rows. Dask’s custom aggregation interface, however, is only exposed with "byKey" operations.
Dask’s built-in full DataFrame aggregations use a lower-level interface called apply_contact_apply
, for partial aggregations. Rather than learn two different APIs for partial aggregations, we prefer to do a static groupby
by providing a constant grouping function. This way we only have to know one interface for aggregations. You can use this to see what the aggregate COVID-19 numbers are across the DataFrame as shown in Dask DataFrame Aggregate Entire DataFrame.
link:examples/dask/Dask-Ch4-DataFrames.py[role=include]
If a built-in aggregations exist they will likely be better than anything we may be able to write. Sometimes a partial aggregation is partially implemented, like in the case of Dask’s HyperLogLog: it is implemented only for full DataFrame. You can often translate simple aggregations using apply_contact_apply or `aca
by copying the chunk function, using the combine
parameter for agg
, and using the aggregate
parameter for finalize
. This is shown with porting Dask’s HyperLogLog implementation in Dask DataFrame Port apply_concat_apply aggregate.
link:examples/dask/Dask-Ch4-DataFrames.py[role=include]
Slow/inefficient aggregations[6] use storage proportional to the records being aggregated. Examples from this slow group include making a list, and naively computing exact quantiles[7]. With these slow aggregates, there is no benefit to using Dask’s aggregation class over the apply
API which you may wish to use for simplicity. For example, if you just wanted a list of employer IDs by postal code, rather than having to write three different functions you could use a one-liner like df.groupby("PostCode")["EmployerId"].apply(lambda g: list(g))
. Apply is implemented as a "full-shuffle" which is covered in the next section.
Warning
|
Dask is unable to apply partial aggregations when you use the |
Sorting is inherently expensive in distributed systems because it, most often, requires a full shuffle. Full shuffles are sometimes an unavoidable part of working in Dask. Counter-intuitively, while full shuffles are themselves slow, you can use them to speed up future operations that are all happening on the same grouping key(s). As mentioned in the aggregation section, one of the ways a full shuffle is triggered is by using the apply
method when partitioning is not aligned.
You will most commonly use full-shuffles to repartition your data. It’s important to have the right partitioning when dealing with aggregations, rolling windows, or look-ups/indexing. As discussed in the rolling window section, Dask cannot do more than one partition worth of look-ahead or look-behind, so having the right partitioning is required to get correct results. For most other operations, having incorrect partitioning will slow down your job.
Dask has three primary methods for controlling the partitioning of a DataFrame: set_index
repartition
, and shuffle
. You use set_index
for situations where the partitioning is being changed to a new key/index. repartition
keeps the same key/index but changes the splits. repartition
and set_index
take similar parameters, with repartition
not taking an index key name. shuffle
is a bit different since it does not produce a "known" partitioning scheme that operations like groupby
can take advantage of.
The first step of getting the right partitioning for your DataFrame is to decide if you want an index. Indexes are useful when filtering data, indexing, grouping, and pretty much any other "by key" type operation. One such by key operation would be a groupby, then the column being grouped on could be a good candidate for the key. If you use a rolling window over a column, then that column must be the key, which makes choosing the key relatively easy. Once you’ve decided on an index you can call set_index
with the column name of the index (e.g. set_index("PostCode")
). This will, under most circumstances, result in a shuffle, so it’s a good time to size your partitions.
Tip
|
If you’re unsure what the current key used for partitioning is, you can check the |
Once you’ve chosen your key, the next question is how to "size" your partitions. The advice in Partitioning generally applies here: shoot for enough partitions to keep each machine busy but keep in mind the general sweet spot of 100mb to 1gb. Dask generally computes pretty even splits if you give it a target number of partitions[8]. Thankfully set_index
will also take npartitions
. To repartition the data by postal code, with 10 partitions, you would add set_index("PostCode", npartitions=10)
, otherwise Dask will default to the number of input partitions.
If you plan to use rolling windows, you will likely need to ensure you have the right size (in terms of key range) covered in each partition. To do this as part of set_index
you would need to compute your own divisions to ensure each partition has the right range of records present. Divisions are specified as a list starting from the minimal value of the first partition up to the maximum value of the last partition. Each value in between is a "cut" point for between the pandas DataFrames that make up the Dask DataFrame. To make a DataFrame with "[0, 100) [100, 200), (300, 500]" you would write df.set_index("NumEmployees", divisions=[0, 100, 200, 300, 500])
. Similarly for the date range, to support a rolling window of up to 7 days, from the start of the pandemic to this writing is shown in Dask DataFrame Rolling Window with set_index.
link:examples/dask/Dask-Ch4-DataFrames.py[role=include]
Warning
|
Dask, including rolling time windows, assumes that your partition index is monotonically increasing.[9] |
So far you’ve had to specify the number of partitions, or the specific divisions, but you might be wondering if Dask can just figure this out itself. Thankfully, Dask’s repartition function has the ability to pick divisions from a target size. However, doing this is a non-trivial cost as Dask must evaluate the DataFrame as well as the repartition itself.
link:examples/dask/Dask-Ch4-DataFrames.py[role=include]
Warning
|
Dask’s set_index has a similar |
As you’ve seen at the start of this chapter when writing DataFrame, each partition is given its own file, but sometimes this can result in files that are too big or too small. Some tools can only accept one file as input, so you need to repartition everything into a single partition. Other times, the data storage system is optimized for certain file sizes, like HDFS' default block size of 128mb. The good news is you can use repartition
or set_index
to get your desired output structure.
map_partitions
applies a function to each of the partitions underlying pandas DataFrames and the result is also a pandas DataFrame. Functions implemented with map_partitions
are embarrassingly parallel[10] since they don’t require any inter-worker transfer of data. map_partitions
implements map
, and many row-wise operations. If there is a row-wise operation that you want to use that you find missing you can implement it yourself, as shown with Dask DataFrame fillna.
link:examples/dask/Dask-Ch4-DataFrames.py[role=include]
You aren’t limited to calling pandas’s built-ins as in the above example. Provided that your function takes and returns a DataFrame, you can do pretty much anything you want inside of map_partitions
.
The full pandas API is too long to cover in the scope of this chapter, but if a function can operate on a row-by-row basis without any knowledge of the rows before or after, it may already be implemented in Dask Dataframes using map_partitions. If not you can also implement it yourself using the pattern from the Dask DataFrame fillna example.
When using map_partitions
on a DataFrame you can change anything about each row, including the key that it is partitioned on. If you are changing the values in the partition key, you must either clear the partitioning information on the resulting DataFrame with clear_divisions()
or specify the correct indexing with set_index
which you’ll learn more about in the next section.
Warning
|
Incorrect partitioning information can result in incorrect results, not just exceptions, as Dask may miss relevant data. |
Pandas and Dask have four common functions for combining DataFrames. At the root is the concat function, which allows joining DataFrames on any axis. Concatenating DataFrames is generally slower in Dask since it involves inter-worker communication. The other three functions are join, merge, and append, which all implement special cases for common situations on top of concat, and have slightly different performance considerations. Having good divisions/partitioning, in terms of key selection and number of partitions, make a huge difference when working on multiple DataFrames.
Dask’s join and merge functions take most of the standard pandas arguments along with an extra, optional, npartitions
. npartitions
specifies a target number of output partitions, but is only used for hash-based joins (which you’ll learn about in the join type section). They both automatically repartition your input DataFrames if needed. This is great as you might not know the partitioning, but since repartitioning can be slow, explicitly using the lower-level concat
function when you don’t expect any partitioning changes to be needed can help catch performance problems early. Dask’s join only takes more than two DataFrames at a time when doing a "left" or "outer" join type.
Tip
|
Dask has special logic to speed up multi-DataFrame joins, so in most cases rather than do |
When you combine / concat
DataFrames by row[11] the performance depends on if divisions of the DataFrames being combined are "well ordered." We call the divisions of a series of DataFrames well ordered if all the divisions are known, and the highest division of the previous DataFrame is below that of the lowest division of the next. If any input has an unknown division Dask will produce an output without known partitioning. With all known partitions Dask treats row-based concatenations as a metadata-only change, and will not perform any shuffle. This requires that there is no overlap between the divisions. There is also an extra interleave_partitions
parameter, which will change the join type for row-based combinations to one without the input partitioning restriction and resulting in a known partitioner.
Dask’s column-based `concat`[12] also has restrictions around the divisions/partitions of the DataFrames it is combining. Dask’s version of concat only supports "inner" or full "outer" join, not "left" or "right." Column-based joins require that all inputs have known partitioners and also result in a DataFrame with known partitioning. Having a known partitioner can be useful for subsequent joins.
Warning
|
Dask’s concat, when operating by row, should not be called unknown divisions as it will likely return incorrect results.[13] |
Dask uses four different techniques/types, hash, broadcast, partitioned, and stack_partitions
, to combine DataFrames with very different performance. Dask chooses the technique based on the indexes, divisions, and requested join type (e.g. outer/left/inner). The three column-based join techniques are hash joins, broadcast, and partitioned joins. When doing row-based combinations (e.g. append
), Dask has a special technique called stack_partitions
which is extra fast. It’s important that you understand the performance of each of these techniques and the conditions that will cause Dask to pick which approach.
Hash joins are the default that Dask uses when no other join technique is suitable. Hash joins involve shuffling the data for all the input DataFrames to partition on the target key. Hash joins use the hash values of keys which results in a DataFrame that is not in any particular order. As such the result of hash joins do not have any known divisions.
Broadcast joins are ideal for joining large dataframes with small dataframes. In a broadcast join Dask takes the smaller DataFrame and distributes it to all of the workers. This means that the smaller DataFrame must be able to fit in memory. To tell Dask that a DataFrame is a good candidate for broadcasting you make sure it is all stored in one partition, e.g. call repartition(npartitions=1)
.
Partitioned joins happen when combining DataFrames along an index where the partitions/divisions are known for all the DataFrames. Since the input partitions are known, Dask is able to align the partitions between the DataFrames involving less data transfer as each output partition has a smaller than full set of inputs.
Since partition and broadcast joins are faster, it can be worth it to do some work to help Dask. For example, concatenating several DataFrames with known and aligned partitions/divisions, and one DataFrame which is unaligned, will result in an expensive hash join. Instead try and either set the index & partition on the remaining DataFrame, or join the less expensive DataFrames first then perform the expensive join after.
stack_partitions
is different from all of the others since it doesn’t involve any movement of data. Instead, the resulting DataFrame partitions list is a union of the upstream partitions from the input DataFrames. Dask uses stack_partitions for most row-based combinations except when all of the input DataFrame divisions are known and they are not well ordered and you ask Dask to interleave_partitions
. stack_partitions
is only able to provide known partitioning in its output when the input divisions are known and well-ordered. If all of the divisions are known but not well ordered and you set interleave_partitions
Dask will use a partitioned join instead. While this approach is comparatively inexpensive, it is not free, and can result in an excessively large number of partitions where you end up needing to re-partition anyway.
Dask’s DataFrame implements most, but not all, of the pandas DataFrame API. Some of the pandas API is not implemented in Dask due to the development time involved, and other parts to avoid exposing an API that would be unexpectedly slow.
Sometimes the API is just missing small parts as both pandas and Dask are under active development. An example of this is the split
function from [ds_hello]. In local pandas, instead of doing split().explode()
, you could have called split(expand=true)
. Some of these can be excellent places where you can get involved and contribute to the Dask project if you are interested.
Some libraries do not parallelize as well as others. In these cases, a common approach is to try and filter or aggregate the data down enough that it can be represented locally and then apply the local libraries to the data. For example, with graphing it’s common to pre-aggregate the counts or take a random sample and graph the result.
While much of the Pandas DataFrame API will work out-of-the-box, before you swap in Dask DataFrame it’s important to make sure you have good test coverage to catch the situations where it does not.
Usually, using Dask DataFrames will improve performance, but not always. Generally, smaller datasets will perform better in local pandas. As discussed, anything involving shuffles is generally slower in a distributed system than in a local one. Iterative algorithms can also produce large graphs of operations, which are slow to evaluate in Dask compared to traditional greedy evaluation.
Some problems are generally unsuitable for data-parallel computing. For example, writing out to a data store with a single lock that has more parallel writers will increase the lock contention and may make it slower than if a single thread was doing the writing. In these situations, you can sometimes repartition your data or write individual partitions to avoid lock contention.
Dask’s lazy evaluation, powered by its lineage graph, is normally beneficial allowing it to combine steps automatically. However, when the graph gets too large Dask can struggle to manage it which often shows up as a slow driver process or notebook, and sometimes an out of memory exception. Thankfully, you can work around this by writing your DataFrame out and reading it back in. Generally, Parquet is the best format for doing this as it is space-efficient and self-describing so no schema inference is required.
For performance reasons, various parts of Dask DataFrames behave a little differently than local Dataframes:
-
reset_index - the index will start back over at zero on each partition
-
kurtosis - uses does not filter out NaNs, uses scipy defaults
-
concat - instead of coercing category types, each category type is expanded to the union of all of the categories it is concatenated with.
-
sort_values - Dask only supports single column sorts
-
When joining more than two DataFrames at the same time the join type must be one of outer or left.
Tip
|
If you are interested in going deeper with Dask there are several Dask focused books in active development, including Scaling Python with Dask by Holden Karau and Mika Kimmins where much of this data frame chapter is based from. |
Modin, like Dask DataFrame, is designed to largely be a plug-in replacement for pandas DataFrame. Modin DataFrames follow the same general performance as Dask DataFrames, with a few caveats. Modin offers less control over internals, which can limit performance for some applications. Since Modin and Dask DataFrame are sufficiently similar we won’t cover it here except to say it’s another option if Dask doesn’t meet your needs.
Modin is a new library designed to accelerate Pandas by automatically distributing the computation across all of the system’s available CPU cores. With that, Modin claims to be able to get nearly linear speedup to the number of CPU cores on your system for Pandas DataFrames of any size.
Since Modin on Ray is so similar to Dask DataFrame we’ve decided to skip repeating the examples from Dask on Ray as they would not change substantially.
If you’re working with an existing "big data" infrastructure, like Hive, Iceberg, HBase, or similar, Spark is an excellent choice. Spark has optimizations like filter push down, which can dramatically improve performance. Spark has a more traditional, "big data'' DataFrame interface. Spark’s strong suit is in the data ecosystem of which it is a part. As a Java-based tool, with a Python API, Spark plugs into much of the traditional "big data'' ecosystem. Spark supports the widest array of formats and filesystems, making it an excellent choice for the initial stages of many pipelines. While Spark continues to add more pandas-like functionality, its DataFrames started from more of a SQL-inspired design. There are several options to learn about Spark, including some O’Reilly books Learning Spark, High Performance Spark, and Spark the definitive guide.
Warning
|
Unlike Ray, Spark is generally lazy, meaning it does not evaluate data until forced. This can make debugging a challenge as errors may appear several lines removed from their root cause. |
Some tools are not well suited to distributed operation. Thankfully, provided your DataSet is filtered down small enough, you can convert your DataSet into a variety of local in-process formats. If the entire DataSet can fit in memory, the to_pandas
and to_arrow
are the simplest way to convert a DataSet to a local object. For larger sized objects, where each partition may fit in memory but the entire DataSet may not, the iter_batches
function will give you a generator/iterator where you can consume one partition at a time. iter_batches
takes a batch_format
parameter to switch between pandas
or pyarrow
. If possible, pyarrow
is generally more efficient than pandas
.
In addition to allowing you to move data between different tools, Ray also has some built in operations. Ray Datasets do not attempt to match any particular existing API, but rather expose basic building blocks you can use when the existing libraries do not meet your needs.
Ray’s DataSet has support for a number of basic data operations. Ray Datasets do not aim to expose a pandas like API, rather it focuses on providing basic primitives to build on top of. The dataset API is functionally inspired, along with partition-oriented functions. Ray also recently added groupBys and aggregates.
The core building block of most of the DataSet operations is map_batches
. By default map_batches
executes the function you provide across the blocks/batches that make up a Dataset and uses the results to make a new Dataset. map_batches
is used to implement filter
, flat_map
, and map
. You can see the flexibility of map_batches
by looking at the wordcount example rewritten to directly use map_batches
and also drop any word that only shows up once.
link:examples/ray_examples/data/Ray-DataSets.py[role=include]
map_batches
takes a number of parameters to customize its behavior. For stateful operations, you can change the compute
strategy to actors
from its default tasks
. The previous example used the default format, which is Ray’s internal format, but you can also convert the data to pandas
or pyarrow
. You can see this in Ray map_batches with pandas to update a column where we have Ray convert the data to pandas for me.
link:examples/ray_examples/data/Ray-DataSets.py[role=include]
Tip
|
The result you return must be a list, |
Ray Datasets do not have a built-in way to specify additional libraries to be installed. You can use map_batches
along with a task to accomplish this, as shown in Ray map_batches with extra libraries where we install extra libraries to parse the HTML.
link:examples/ray_examples/data/Ray-DataSets.py[role=include]
For operations needing shuffles Ray has a GroupedDataset
, which behaves a bit differently. Unlike the rest of the DataSet API, groupby
is lazily evaluated in Ray. groupby
takes a column name or function where records with the same value will be aggregated together. Once you have the GroupedDataset
you can then pass in multiple aggregates to the aggregate
function. Ray’s AggregateFn
class is conceptually similar to Dask’s Aggregation
class except operating by row. Since it operates by row, you need to provide an init
function for when a new key value is found. Instead of chunk
for each new chunk you provide accumulate
to add each new element. You still provide a method of combining the aggregators called merge
instead of agg
, and both have the same optional finalize
. To understand the difference we rewrote the Dask weighted average example to ray in Ray weighted average aggregation.
link:examples/ray_examples/data/Ray-DataSets.py[role=include]
Note
|
Full dataset aggregation is implemented using |
Ray’s parallelism control does not have the same flexibility as indexes in Dask or partitioning in Spark. You can control the target number of partitions – but not how the data is spread out.
Note
|
Ray does not currently take advantage of the concept of "known partitioning" to minimize shuffles. |
Ray Datasets are built using the tools you have been working with in the previous chapters. Ray splits each DataSet into many smaller components. These smaller components are called both blocks and partitions inside of the Ray code. Each partition contains an Arrow DataSet representing a slice of the entire Ray DataSet. Since Arrow does not support all of the types from Ray, if you have non-supported types each partition also contains a list of the non-supported types.
The data inside of each dataset is stored in the standard Ray object store. Each partition is stored as a separate object, since Ray is not able to split up individual objects. This also means that you can use the underlying Ray objects as parameters to Ray remote functions and actors. The Dataset contains references to these objects as well as schema information.
Tip
|
Since the Dataset contains the schema information, loading a dataset blocks on the first partition so that the schema information can be determined. The remainder of the partitions are eagerly loaded, but in a non-blocking fashion like the rest of Ray’s operations. |
In keeping with the rest of Ray, Datasets are immutable. When you want to do an operation on a dataset you apply a transformation, like filter
, join
, or map
and Ray returns a new DataSet with the result.
Ray Datasets can use either tasks (aka remote functions) or actors for processing transformations. Some libraries built on top of Ray Datasets, like Modin, depend on using actor processing so they can implement certain ML tasks involving state.
Ray’s transparent handling of moving data between tools makes it an excellent choice for building end-to-end machine learning pipelines when compared with traditional techniques where the communication barrier between tools is much higher. Two separate frameworks, Modin and Dask, both offer a "pandas like" experience on top of Ray Datasets making it easy to scale existing Data Science workflows. Spark on Ray (known as Ray DP) provides an easy integration path for those working in organizations with existing big-data tools. In this chapter, you have learned how to effectively process data with Ray to power your machine learning and other needs. In the next chapter, you will learn how you can use Ray to power machine learning.