Skip to content

Latest commit

 

History

History
194 lines (139 loc) · 16.5 KB

ch09.asciidoc

File metadata and controls

194 lines (139 loc) · 16.5 KB

Migrating Existing Analytic Engineering

Many users will already have analytic work that is currently deployed and that they want to migrate over to Dask. This chapter will discuss the considerations, challenges, and experiences of users making the switch. The main migration pathway explored in the chapter is moving an existing big data engineering job from another distributed framework, such as Spark, into Dask.

Why Dask?

Here are some reasons to consider migrating to Dask from an existing job that is implemented in pandas, or distributed libraries like PySpark:

Python and PyData stack

Many data scientists and developers prefer using a Python-native stack, where they don’t have to switch between languages or styles.

Richer ML integrations with Dask APIs

Futures, delayed, and ML integrations require less glue code from the developer to maintain, and there are performance improvements from the more flexible task graph management Dask offers.

Fine-grained task management

Dask’s task graph is generated and maintained in real time during runtime, and users can access the task dictionary synchronously.

Debugging overhead

Some developer teams prefer the debugging experience in Python, as opposed to mixed Python and Java/Scala stacktrace.

Development overhead

The development step in Dask can be done locally with ease with the developer’s laptop, as opposed to needing to connect to a powerful cloud machine in order to experiment.

Management UX

Dask visualization tools tend to be more visually pleasing and intuitive to reason, with native graphviz rendering for task graphs.

These are not all of the benefits, but if any of them speak to you, it’s probably worth investing the time to consider moving the workload to Dask. There are always trade-offs involved, so the next section will look at some of the limitations, followed by a road map to give you an idea of the scale of work involved in moving to Dask.

Limitations of Dask

Dask is relatively new, and the use of Python data stack to perform large-scale extract, transform, load operations is also fairly new. There are limitations to Dask, which mainly arise from the fact that PyData stack has traditionally not been used to perform large-scale data workloads. At the time of writing, there are some limits to the system. However, they are being addressed by developers, and a lot of these deficiencies will be filled in. Some of the fine-grained considerations you should have are as follows:

Parquet scale limits

If Parquet data exceeds 10 TB in scale, there are issues at the fastparquet and PyArrow level that slow Dask down, and metadata management overhead can be overwhelming.

ETL workloads with Parquet files at 10 TB in scale and beyond, and that include a mutation, such as append and update, run into consistency issues.

Weak data lake integrations

PyData stack has not engaged much in the big data world traditionally, and the integrations on data lake management, such as Apache Iceberg, are missing.

High-level query optimization

Users of Spark would be familiar with the Catalyst optimizer that pushes down predicates for optimizing the physical work on the executors. This optimization layer is missing in Dask at the moment. Spark in its early years also did not have the Catalyst engine written yet, and there is work in progress to build this out for Dask.

Any list of limitations for a rapidly developing project like Dask may be out of date by the time you read it, so if any of these are blockers for your migration, make sure to check Dask’s status tracker.

Migration Road Map

While no engineering work is linear in process, it’s always a good idea to have a road map in mind. We’ve laid out an example of migration steps as a non-exhaustive list of items a team might want to think through when planning its move:

  • What kind of machines and containerization framework will we want to deploy Dask on, and what are their pros and cons?

  • Do we have tests to ensure our migration correctness and our desired goals?

  • What type of data is Dask able to ingest, and at what scale, and how does that differ from other platforms?

  • What is the computation framework of Dask, and how do we think in Dask and Pythonic ways to achieve the task?

  • How would we monitor and troubleshoot the code at runtime?

We’ll start by looking at the types of clusters, which goes with the deployment framework, as it is often one of the issues requiring collaboration with other teams or organizations.

Types of Clusters

If you are considering moving your analytic engineering job, you probably have a system that’s provisioned to you by your organization. Dask is supported in many commonly used deployment and development environments, with some allowing more flexibility in scaling, dependency management, and support of heterogeneous worker types. We have used Dask on academic environments, on commodity cloud, and directly over VMs/containers; we’ve detailed the pros and cons, and some well-used and supported environments, in [appA].

Deploying Dask on YARN with Dask-Yarn and skein shows an example of a YARN deployment. More examples and in-depth discussion can be found in [ch12].

Example 1. Deploying Dask on YARN with Dask-Yarn and skein
link:./examples/dask/Dask-Ch10_porting.py[role=include]

If your organization has multiple clusters that are supported, choosing one where you can self-serve dependency management, like Kubernetes, is beneficial.

For high-performance computing deployments using job queuing systems such as PBS, Slurm, MOAB, SGE, LSF, and HTCondor, you should use Dask-jobqueue, as shown in Deploying Dask using job-queue over Slurm.

Example 2. Deploying Dask using job-queue over Slurm
link:./examples/dask/Dask-Ch10_porting.py[role=include]

You likely have a shared filesystem already set up by your organization’s admin. Enterprise users might be used to already robustly provisioned distributed data sources, running on HDFS or blob storage like S3, which Dask works with seamlessly (see Reading and writing to blob storage using MinIO). Dask also integrates well with networked filesystems.

Example 3. Reading and writing to blob storage using MinIO
link:./examples/dask/Dask-Ch10_porting.py[role=include]

We found that one of the surprisingly useful use cases is connecting directly to network storage such as NFS or FTP. When working on an academic dataset that’s large and clunky to work with (like a neuroimaging dataset that’s directly hosted by another organization), we could connect directly to the source filesystem. When using Dask this way, you should test out and consider network timeout allowances. Also note that, as of this writing, Dask does not have a connector to data lakes such as Iceberg.

Development: Considerations

Translating an existing logic to Dask is a fairly intuitive process. The following sections present some considerations if you’re coming from libraries such as R, pandas, and Spark, and how Dask might differ from them. Some of these differences result from moving from a different low-level implementation, such as Java, and others result from moving from single-machine code to a scaled implementation, as when you’re coming from pandas.

DataFrame performance

If you have a job that you are already running on a different platform, it’s likely you are already using columnar storage format, like Parquet, and reading at runtime. The data type mapping from Parquet to Python is inherently imprecise. It’s a good idea to check data types when reading in any data at runtime, and the same applies to DataFrame. If type inference fails, a column would default to object. Once you inspect and determine the type inference is imprecise, specifying data types can speed up your job a lot. Additionally, it’s always a good idea to check strings, floating point numbers, datetime, and arrays. If type errors arise, keeping in mind the upstream data sources and their data type is a good start. For example, if the Parquet is generated from protocol buffers, depending on what encode and decode engine was used, there are differences in null checks, float, doubles, and mixed precision types that are introduced in that stack.

When reading a large file from cloud storage into DataFrame, it may be useful to select columns ahead of time at the DataFrame read stage. Users from other platforms like Spark would be familiar with predicate push-down, where even if you don’t quite specify the columns desired, it would optimize and read only the required column for computation. Dask doesn’t quite provide that optimization yet.

Setting smart indices early in the transformation of your DataFrame, prior to a complex query, can speed things up. Be aware that multi-indexing is not supported by Dask yet. A common workaround for a multi-indexed DataFrame from other platforms is mapping as a single concatenated column. For example, a simple workaround when coming from a non-Dask columnar dataset, like pandas pd.MultiIndex that has two columns as its index—say, col1 and col2—would be to introduce a new column in Dask DataFrame col1_col2 as Dask.

During the transform stage, calling .compute() coalesces a large distributed Dask DataFrame to a single partition that should fit in RAM. If it does not, you may encounter problems. On the other hand, if you have filtered an input data of size 100 GB down to 10 GB (say your RAM is 15 GB), it is probably a good idea to reduce the parallelism after the filter operation by invoking .compute(). You can check your DataFrame’s memory usage by invoking df.memory_usage(deep=True).sum() to determine if this is the right call. Doing this can be particularly useful if, after the filter operation, you have a complex and expensive shuffle operation, such as .join() with a new larger dataset.

Tip

Dask DataFrame is not value-mutable in the way that pandas DataFrame users might be familiar with. Since in-memory modification of a particular value is not possible, the only way to change a value would be a map operation over the whole column of the entire DataFrame. If an in-memory value change is something you have to do often, it is better to use an external database.

Porting SQL to Dask

Dask does not natively offer a SQL engine, although it does natively offer options to read from a SQL database. There are a number of different libraries you can use to interact with an existing SQL database, and to treat Dask DataFrame as a SQL table and run SQL queries directly (see Reading from a Postgres database). Some allow you to even build and serve ML models directly using SQL ML syntax similar to that of Google’s BigQuery ML. In Examples #Dask_sql_linear_regression and #Dask_sql_XGBClassifier, we will show the use of Dask’s native read_sql() function and running SQL ML using Dask-SQL.

Example 4. Reading from a Postgres database
link:./examples/dask/Dask-Ch10_porting.py[role=include]

FugueSQL provides SQL compatibility to PyData stack, including Dask. The project is in its infancy but seems promising. The main advantage of FugueSQL is that the code is portable between pandas, Dask, and Spark, giving a lot more interoperability. FugueSQL can run its SQL queries using DaskExecutionEngine, or you can run FugueSQL queries over a Dask DataFrame you already are using. Alternatively, you can run a quick SQL query on Dask DataFrame on your notebook as well. Running SQL over Dask DataFrame with FugueSQL shows an example of using FugueSQL in a notebook. The downside of FugueSQL is that it requires the ANTLR library, which in turn requires a Java runtime.

Example 5. Running SQL over Dask DataFrame with FugueSQL
from fugue_notebook import setup
setup (is_lab=True)
ur = ('https://d37ci6vzurychx.cloudfront.net/trip-data/'
      'yellow_tripdata_2018-01.parquet')
df = dd.read_parquet(url)

%%fsql dask
tempdf = SELECT VendorID, AVG (total_amount) AS average_fare FROM df
GROUP BY VendorID

SELECT *
FROM tempdf
ORDER BY average fare DESC
LIMIT 5
PRINT
VendorID average_fare

0

1

15.127384

1

2

15.775723

schema: VendorID:long, average_fare:double

An alternate method is to use the Dask-SQL library. This package uses Apache Calcite to provide the SQL parsing frontend and is used to query Dask DataFrames. With that library, you can pass most of the SQL-based operations to the Dask-SQL context, and it will be handled. The engine handles standard SQL inputs like SELECT, CREATE TABLE, but also ML model creation, with the CREATE MODEL syntax.

Deployment Monitoring

Like many other distributed libraries, Dask provides logs, and you can configure Dask logs to be sent to a storage system. The method will vary by the deployment environment, and whether Jupyter is involved.

The Dask client exposes the get_worker_logs() and get_scheduler_logs() methods, which can be accessed at runtime if desired. Additionally, similar to other distributed system logging, you can log events by topic, making them easily accessible by event types.

Basic logging by topic is a toy example of adding a custom log event to the client.

Example 6. Basic logging by topic
link:./examples/dask/Dask-Ch10_porting.py[role=include]

Structured logging on workers builds on the previous example, but swaps in the execution context to a distributed cluster setup, for potentially more complex, custom structured events. The Dask client listens and accumulates these events, and we can inspect them. We start with a Dask DataFrame and then run some compute-heavy task. This example uses a softmax function, which is a common computation in many ML uses. A common ML dilemma is whether to use a more complex activation or loss function for accuracy, sacrificing performance (thereby running fewer training epochs but gaining a more stable gradient), or vice versa. To figure that out, we insert a code to log custom structured events to time the compute overhead of that specific function.

Example 7. Structured logging on workers
link:./examples/dask/Dask-Ch10_porting.py[role=include]

Conclusion

In this chapter you have reviewed the large questions and considerations of migrating existing analytic engineering work. You’ve also learned some of the feature differences of Dask compared to Spark, R, and pandas. Some features are not yet implemented by Dask, some are more robustly implemented by Dask, and others are inherent translational differences when moving a computation from a single machine to a distributed cluster. Since large-scale data engineering tends to use similar terms and names across many libraries, it’s often easy to overlook minute differences that lead to larger performance or correctness issues. Keeping them in mind will help you as you take your first journeys in Dask.