We’ve covered a few distributed system concepts briefly as needed in this book, but as you get ready to head out on your own, it’s a good idea to review some of the core concepts that Dask is built on. In this appendix, you will learn more about the key principles used in Dask and how they impact the code you write on top of Dask.
Testing is an often overlooked part of data science and data engineering. Some of our tools, like SQL and Jupyter notebooks, do not encourage testing or make it easy to test—but this does not absolve us of the responsibility to test our code. Data privacy concerns can add another layer of challenge, where we don’t want to store user data for testing, requiring us to put in the effort to create "fake" data for testing or break our code down into testable components where we don’t need user data.
We often perform some kind of manual testing while writing software or data tools. This can include simply running the tool and eyeballing the results to see if they look reasonable. Manual testing is time-consuming and not automatically repeatable, so while it is great during development, it is insufficient for long-lived projects.
Unit testing refers to testing individual units of code rather than the whole system together. This requires having your code be composed in different units, like modules or functions. While this is less common with notebooks, we believe that structuring your code for testability is a good practice to follow.
Writing unit tests for notebooks can be challenging; doctests are slightly easier to inline within a notebook. If you want to use traditional unit test libraries, the ipython-unittest magics let you inline your unit tests within your notebook.
Integration testing refers to testing how different parts of a system work together. It is often much closer to the real usage of your code, but it can be more complicated, as it involves setting up other systems to test against. You can (to an extent) use some of the same libraries for integration testing, but these tests tend to involve more setup and teardown work.[1] Integration testing is also more likely to be "flaky," since making sure that all of the different components your software needs are present in your test environment before starting the tests can be challenging.
Test-driven development involves taking the requirements or expectations of your code and writing tests and then writing the code after. For data science pipelines this can often be done by creating a sample input (sometimes called a golden set) and writing out what you expect the output to be. Test-driven development can be complicated, especially when integrating multiple data sources.
While you don’t need to use test-driven development, we believe it’s important to make tests alongside the development of your data pipelines. Tests added after development are better than no tests, but in our experience the context you have during the development helps you create better tests (and validate your assumptions early on).
Property testing is a potentially great solution to the challenge of coming up with test data that covers all of the edge cases in terms of data that your code could trip up on. Instead of writing the traditional "for input A, result B is expected," you specify properties, like "if we have 0 customers, we should have 0 sales" or "all (valid) customers should have a fraud score after this pipeline."
Hypothesis is the most popular library for property testing in Python.
Testing notebooks is painful, which is unfortunate given their immense popularity. Generally, you can either have your testing outside of the notebook, which allows you the use of existing Python testing libraries, or try to put it inside the notebook.
The traditional option (besides ignoring testing) is to refactor the parts of your code you want to test into separate regular Python files and test those using normal testing libraries. While the partial refactoring can be painful, rewriting to more testable components can bring benefits to debugging as well.
The testbook project is an alternative to refactoring that takes an interesting approach, allowing you to write your tests outside of your notebook, and not requiring you to give up on notebooks. Instead, you use the libraries decorator to annotate tests—for example, @testbook('untitled_7.ipynb', execute=True)
will import and execute the notebook before executing the test. You can also control which parts of the notebook are executed, but this partial execution can be brittle and prone to breakage on updates.
Some people like to use in-line assertions in their notebooks as a form of testing. In this case, if something fails (e.g., the assertion that there should be some customers), the rest of the notebook will not run. While we think that having in-line assertions is great, we don’t believe it is a replacement for traditional testing.
While good testing can catch many problems, sometimes the real world is more creative than we can ever be, and our code will still fail. In many situations, the worst case is that our program fails and produces an incorrect output that we don’t know is incorrect, and then we (or others) take action based on its results. Validation attempts to notify us when our job has failed so that we can take action on it before someone else does. In many ways, it is like running spell-check on a term paper before submission—if there are a few errors, OK, but if everything is red, it’s probably good to double-check. Depending on what your job does, validating it will be different.
There are a number of different tools you can use to validate the output of your Dask job, including of course Dask itself. Some tools, like TFX's data validation, attempt to compare previous versions for statistical similarity and schema changes.[2] Pydantic is relatively new, but it has Dask integration and does excellent type and schema validation. You can also do more complex statistical assertions using its Hypothesis component (which is different from Python’s Hypothesis).
ML models can be more difficult to validate without impacting users, but statistical techniques can still help (as can incremental deployments). Since ML models are produced from data, a good (partial) step can be validating the data.
It is useful to think of what the implications could be of your pipeline failing. For example, you might want to spend more time validating a pipeline that determines dosages of medicine in a clinical trial, compared to a job that predicts which version of your ad will be the most successful.[3]
Even inside of a distributed system, there are various levels of "distributed." Dask is a centralized distributed system, where there is a static leader node responsible for various tasks and coordination among the workers. In more distributed systems, there is no static leader node, and if the head node goes away, the remaining peers can elect a new head node, like with Zookeeper. In even more distributed systems, there is no head node distinction, and all of the nodes in the cluster are effectively equally capable (from a software point of view; the hardware may be different).
Centralized distributed systems tend to be faster, while encountering limitations earlier in terms of scaling and challenges around the failure of the centralized component.
There are many different ways to split up our work, and in this book we’ve mostly looked at task and data parallelism.
dask.delayed
and Python’s multi-processing both represent task parallelism. With task parallelism, you are not limited to executing the same code. Task parallelism offers the most flexibility but requires more changes to your code to take advantage of it.
Data parallelism refers to taking the same operation and running it in parallel on different chunks (or partitions) of data. This is a wonderful technique for operations on DataFrames and arrays. Data parallelism depends on partitioning to split up the work. We cover partitioning in detail in [ch04].
Narrow transformations (or data parallelism without any aggregation or shuffle) are often much faster than wide transformations, which involve shuffles or aggregations. While this terminology is borrowed from the Spark community, the distinction (and implications for fault tolerance) applies to Dask’s data-parallel operations as well.
Data parallelism is not well suited to many different kinds of work. Even when working on data problems, it is not as well suited to doing many different things (non-uniform computation). Data parallelism is often poorly suited to computation on small amounts of data—for example, model serving where you may need to evaluate a single request at a time.
Load balancing is another way of looking at parallelism where a system (or systems) routes the requests (or tasks) to different servers. Load balancing can range from basic, like round-robin, to "smart," taking advantage of information about the relative load, resources, and data on the workers/servers to schedule the task. The more complex the load balancing is, the more work the load balancer has to do. In Dask all of this load balancing is handled centrally, which requires that the head node has a relatively complete view of most workers' state to intelligently assign tasks.
The other extreme is "simple" load balancing, where some systems, like DNS round-robin-based load balancing (not used in Dask), do not have any information about the system loads and just pick the "next" node. When tasks (or requests) are roughly equal in complexity, round-robin-based load balancing can work well. This technique is most often used for handling web requests or external API requests where you don’t have a lot of control over the client making the requests. You are most likely to see this in model serving, like translating text or predicting fraudulent transactions.
If you search for "distributed computing concepts," you will likely come across the CAP theorem. The CAP theorem is most relevant for distributed data stores, but it’s useful to understand regardless. The theorem states that we cannot build a distributed system that is consistent, available, and partition-tolerant. Partitions can occur from hardware failure or, more commonly, from overloaded network links.
Dask itself has already made the trade-off of not being partition-tolerant; whichever side of a network partition has the "leader" is the side that continues on, and the other side is unable to progress.
It’s important to understand how this applies to the resources that you are accessing from Dask. For example, you may find yourself in a case in which a network partition means that Dask is unable to write its output. Or—even worse, in our opinion—it can result in situations in which the data you store from Dask is discarded.[4]
The Jepsen project, by Kyle Kingsbury, is one of the best projects that we know of for testing distributed storage and query systems.
Recursion refers to functions that call themselves (either directly or indirectly). When it’s indirect, it’s called co-recursion, and recursive functions that return the final value are called tail-recursive.[5] Tail-recursive functions are similar to loops, and sometimes the language can translate tail-recursive calls into loops or maps.
Recursive functions are sometimes avoided in languages that cannot optimize them, since there is overhead to calling a function. Instead, users will try to express the recursive logic using loops.
Excessive non-optimized recursion can result in a stack overflow error. In C, Java, C++, and more, stack memory is allocated separately from the main memory (also called heap memory). In Python, the amount of recursion is controlled by setrecursionlimit
. Python provides a tail-recursive annotation that you can use to help optimize these recursive calls.
In Dask, while recursive calls don’t have the exact same stack problem, excessive recursion can be one of the causes of load on the head node. This is because scheduling the recursive call must pass through the head node, and the excessive number of recursive functions will cause Dask’s scheduler to slow down long before any stack size issues are countered.
Versioning is an important computer science concept, and it can be applied to both code and data. Ideally, versioning makes it easy to undo errors and go back to earlier versions or explore multiple directions simultaneously. Many of the items we produce are a combination of both our code and our data; to truly meet the goal of being able to quickly roll back and support experimentation, you will want to have versioning for both your code and your data.
Version control tools for source code have existed for a long time. For code, Git has become the most popular open source version control system in usage, overtaking tools such as Subversion, Concurrent Version Systems, and many others.
While understanding Git thoroughly can be very complicated,[6] for common usage there are a few core commands that often see you through. Teaching Git is beyond the scope of this appendix, but there are a great many resources, including Head First Git by Raju Gandhi (O’Reilly) and Oh Shit, Git! by Julia Evans, as well as free online resources.
Unfortunately, software version control tools don’t currently have the best notebook integration experience and often require additional tools like ReviewNB to make the changes understandable.
Now, a natural question is, can you use the same tools for versioning your data as your software? Sometimes you can—provided that your data is small enough and does not contain any personal information, using source control on data can be OK. However, software tends to be stored in text and is normally relatively smaller than your data, and many of the source control tools do not work well when files start to exceed even a few dozen MBs.
Instead, tools like LakeFS add Git-like versioning semantics on top of existing external data stores (e.g., S3, HDFS, Iceberg, Delta).[7] Another option is to make copies of your tables manually, but we find this leads to the familiar "-final2-really-final" problem with naming notebooks and Word docs.
So far, we’ve talked about isolation in the context of being able to have your Python packages, but there are more kinds of isolation. Some other levels of isolation include CPU, GPU, memory, and network.[8] Many cluster managers do not provide full isolation—this means that if your tasks get scheduled on the wrong nodes, they might have bad performance. A common solution to this is to request the amounts of resources in-line with the full node to avoid having other jobs scheduled alongside your own.
Strict isolation can also have downsides, especially if they don’t support bursting. Strict isolation without bursting can result in resource waste, but for mission-critical workflows this is often the trade-off.
Fault tolerance is a key concept in distributed computing because the more computers you add, the higher the probability of a fault on any given computer. In some smaller deployments of Dask, machine fault tolerance is not as important, so if you’re running Dask exclusively in local mode or on two or three computers you keep under your desk, you might be OK to skip this section.[9]
Dask’s core fault tolerance approach is to re-compute lost data. This is the approach chosen by many modern data-parallel systems since failures are not super common, so making the situation with no failures fast is the priority.[10]
It is important to consider, with fault tolerance of Dask, what the fault condition possibilities are in the components Dask is connected to. While re-compute is a fine approach for distributed computing, distributed storage has different trade-offs.
Dask’s approach to re-compute on failure means that the data that Dask used for the computation remains present to re-load when needed. In most systems, this will be the case, but in some streaming systems you may need to configure longer TTLs or otherwise have a buffer on top to provide the reliability that Dask requires. Also, if you are deploying your own storage layer (e.g., MinIO), it’s important that you deploy it in a way to minimize data loss.
Dask’s fault tolerance does not extend to the leader node. A partial solution to this is often called high availability, where a system outside of Dask monitors and restarts your Dask leader node.
Fault tolerance techniques are often also used when scaling down, since they both involve the loss of a node.
Scalability refers to the ability of a distributed system to grow to handle larger problems and the sometimes overlooked ability to shrink when the needs are reduced (say, after the grad students go to sleep). In computer science, we generally categorize scalability as either horizontal or vertical. Horizontal scaling refers to adding more computers, whereas vertical scaling refers to using bigger computers.
Another important consideration is auto-scaling versus manual scaling. In auto-scaling, the execution engine (in our case, Dask) will scale the resources for us. Dask’s auto-scaler will horizontally scale by adding your workers when needed (provided the deployment supports it). To scale up vertically, you can add larger instance types to Dask’s auto-scaler and request those resources with your jobs.
Note
|
In a way, Dask’s task "stealing" can be viewed as a form of automatic vertical scaling. If a node is incapable of (or especially slow at) handling a task, then another Dask worker can "steal" the task. In practice, the auto-scaler does not allocate higher resource nodes unless you schedule a task that asks for those resources. |
Dask jobs are frequently data-heavy, and the cost of transferring data to the CPU (or GPU) can have a large impact on performance. CPU cache is normally more than an order of magnitude faster than reading from memory. Reading data from an SSD is roughly 4x slower than memory, and sending data within a data center can be ~10 times slower.[11] CPU caches can normally contain only a few elements.
Transferring data from RAM (or even worse, from disk/network) can result in the CPU stalling or not being able to do any useful work. This makes chaining operations especially important.
The Computers Are Fast website does an excellent job of illustrating these performance impacts with real code.
Hashing is an important part not only of Dask but also of computer science in general. Dask uses hashing to convert complex data types into integers to assign the data to the correct partition. Hashing is generally a "one-way" operation that embeds the larger key space into a smaller key space. For many operations, like assigning data to the correct partitions, you want hashing to be fast. However, for tasks like pseudonymization and passwords, you intentionally choose slower hashing algorithms and frequently add more iterations to make it more difficult to reverse. It’s important to pick the right hashing algorithm to match your purposes, since the different behaviors could be a feature in one use case but a bug in the other.
Data transfer costs can quickly overwhelm data compute costs for simple computation. When possible, scheduling tasks on nodes that already have the data is often much faster since the task has to be scheduled somewhere (e.g., you pay the network cost of copying the task regardless), but you can avoid moving the data if you put the task in the right place. Network copies are also generally slower than disk.
Dask allows you to specify a desired worker in your client.submit
with workers=
. Also, if you have data that is going to be accessed everywhere, rather than doing a regular scatter, you can broadcast it by adding broadcast=True
so that all workers have a full copy of the collection.
In most software development the concept of exactly once is so much of a given that we don’t even think of it as a requirement. For example, doubly applied debits or credits to a bank account could be catastrophic. Exactly-once execution in Dask requires the use of external systems because of Dask’s approach to fault tolerance. A common approach is to use a database (distributed or non-distributed) along with transactions to ensure exactly-once execution.
Not all distributed systems have this challenge. Systems in which the inputs and outputs are controlled and fault tolerance is achieved by redundant writes have an easier time with exactly-once execution. Some systems that use re-compute on failure are still able to offer exactly-once execution by integrating distributed locks.
Distributed systems are fun, but as you can see from the distributed systems concepts, they add a substantial amount of overhead. If you don’t need distributed systems, then using Dask in local mode and using local data stores can greatly simplify your life. Regardless of whether you decide on local mode or distributed, having an understanding of general systems concepts will help you build better Dask pipelines.