Skip to content

Latest commit

 

History

History
216 lines (161 loc) · 14.7 KB

ch06.asciidoc

File metadata and controls

216 lines (161 loc) · 14.7 KB

Advanced Task Scheduling: Futures and Friends

Dask’s computational flow follows these four main logical steps, which can happen concurrently and recursively for each task:

  1. Collect and read the input data.

  2. Define and build the compute graph representing the set of computations that needs to be performed on the data.

  3. Run the computation (this happens when you run .compute()).

  4. Pass the result as data to the next step.

Now we introduce more ways to control this flow with futures. So far, you have mostly seen lazy operations in Dask, where Dask doesn’t do the work until something forces the computation. This pattern has a number of benefits, including allowing Dask’s optimizer to combine steps when doing so makes sense. However, not all tasks are well suited to lazy evaluation. One common pattern not well suited to lazy evaluation is fire-and-forget, where we call a function for its side effect[1] and necessarily care about the output. Trying to express this with lazy evaluation (e.g., dask.delayed) results in unnecessary blocking to force computation. When lazy evaluation is not what you need, you can explore Dask’s futures. Futures can be used for much more than just fire-and-forget, and you can return results from them. This chapter will explore a number of common use cases for futures.

Note

You may already be familiar with futures from Python. Dask’s futures are an extension of Python’s concurrent.futures library, allowing you to use them in its place. Similar to using Dask DataFrames in place of pandas DataFrames, the behavior can be a bit different (although the differences here are smaller).

Dask futures are a part of Dask’s distributed client library, so you will get started by importing it with from dask.distributed import Client.

Tip

Despite the name, you can use Dask’s distributed client locally. Refer to [distributed_ch03_1687438078727] for different local deployment types.

Lazy and Eager Evaluation Revisited

Eager evaluation is the most common form of evaluation in programming, including in Python. While most eager evaluation is blocking—that is, the program will not move to the next statement until the result is completed—you can still have asynchronous/non-blocking eager evaluation. Futures are one way of representing non-blocking eager computation.

Non-blocking eager evaluation still has some potential downsides compared to lazy evaluation. Some of these challenges include:

  • The inability to combine adjacent stages (sometimes known as pipelining)

  • Unnecessary computation

    • Repeated subgraphs cannot be detected by Dask’s optimizer.

    • Even if nothing depends on the result of the future, it may be computed.[2]

  • Potential excessive blocking when futures launch and block on other futures

  • A need for more careful memory management

Not all Python code is eagerly evaluated. In Python 3 some built-in functions use lazy evaluation, with operators like map returning iterators and evaluating elements only on request.

Use Cases for Futures

Many common use cases can be made faster with careful application of futures:

Integrating with other async servers (like Tornado)

Although we generally believe that most of the time Dask is not the right solution for the "hot path," there are exceptions, such as dynamically computed analytic dashboards.

Request/response pattern

Make a call to a remote service and (later) block on its result. This can include querying services like databases, remote procedure calls, or even websites.

IO

Input/output can often be slow, but you know you want them to start happening as soon as possible.

Timeouts

Sometimes you care about a result only if you can get it within a certain period of time. For example, think of a boosted ML model where you need to make a decision within a certain timeframe, collecting all scores from available models quickly and then skipping any that take too long.

Fire-and-forget

Sometimes you might not care about the result of a function call, but you do want to ensure it is called. Futures allow you to ensure a computation occurs without having to block on the result.

Actors

The results from calling actors are futures. We cover actors in the next chapter.

Launching futures in Dask is non-blocking, whereas computing tasks in Dask is blocking. This means that when you submit a future to Dask, while it begins work right away, it does not stop (or block) your program from continuing.

Launching Futures

The syntax for launching Dask futures is a little different than that for dask.delayed. Dask futures are launched from the Dask distributed client with either submit for single futures or map for multiple futures, as shown in Launching futures.

Example 1. Launching futures
link:./examples/dask/Dask-Futures.py[role=include]

Unlike with dask.delayed, as soon as the future is launched, Dask begins to compute the value.

Note

While this map is somewhat similar to the map on Dask bags, each item results in a separate task, whereas bags are able to group together tasks into partitions to reduce the overhead (although they are lazily evaluated).

Some actions in Dask, like persist() on Dask collections, use futures under the hood. You can get the futures of the persisted collection by calling futures_of. These futures follow the same life cycle as the futures that you launch yourself.

Future Life Cycle

Futures have a different life cycle from dask.delayed beyond eager computation. With dask.delayed, intermediate computations are automatically cleaned up; however, Dask futures results are stored until either the future is explicitly canceled or the reference to it is garbage collected in Python. If you no longer need the value of a future, you can cancel it and free any storage space or cores used by calling .cancel. The future life cycle is illustrated in Future life cycle.

Example 2. Future life cycle
link:./examples/dask/Dask-Futures.py[role=include]

Canceling a future behaves differently than deleting or depending on garbage collection. If there is another reference to the future, then deleting or setting the individual reference to None will not cancel the future. This means the result will remain stored in Dask. On the other hand, canceling futures has the downside that if you are incorrect and the futures value is needed, this will cause an error.

Warning

When using Dask in a Jupyter notebook, the notebook may "hold on to" the result of any previous cell, so even if the future is unnamed, it will remain present in Dask. There is a discussion on Discourse with more context for those interested.

The string representation of a future will show you where it is in its life cycle (e.g., Future: slow status: cancelled,).

Fire-and-Forget

Sometimes you no longer need a future, but you also don’t want it to be canceled. This pattern is called fire-and-forget. This is most useful for things like writing data out, updating a database, or other side effects. If all reference to a future is lost, garbage collection can result in the future being canceled. To work around this, Dask has the aptly named fire_and_forget method, which allows you to take advantage of this pattern, as shown in Fire-and-forget, without needing to keep references around.

Example 3. Fire-and-forget
link:./examples/dask/Dask-Futures.py[role=include]

Retrieving Results

More commonly, you will eventually want to know what the future has computed (or even just if it encountered an error). For futures that are not just side effects, you’ll eventually want to get the return value (or error) from the futures. Futures have the blocking method result, as shown in Getting the result, which gives you back the value computed in the future or raises the exception from the future.

Example 4. Getting the result
link:./examples/dask/Dask-Futures.py[role=include]

You can extend this to multiple futures, as in Getting a list of results, but there are ways to do it faster.

Example 5. Getting a list of results
link:./examples/dask/Dask-Futures.py[role=include]

If you’ve got multiple futures together—say, you created them with map—you can get the results back as they become available (see Getting a list of results as they become available). If you can process the results out of order, this can greatly improve your processing time.

Example 6. Getting a list of results as they become available
link:./examples/dask/Dask-Futures.py[role=include]

In the preceding example, by processing futures as they complete you can have the main thread do its "business logic" (similar to combine step for an aggregate) for each element as it becomes available. If the futures finish at different times, this can be a large speed increase.

If you have a deadline, like scoring a model for ad serving[3] or doing something funky with the stock market, you might not want to wait for all of your futures. Instead, the wait function allows you to fetch results with a timeout, as shown in Getting the first future (within a time limit).

Example 7. Getting the first future (within a time limit)
link:./examples/dask/Dask-Futures.py[role=include]

This time limit can apply either to the entire set or to one future at a time. If you want all features finished by a given time, then you need a bit more work, as shown in Getting any futures that finish within a time limit.

Example 8. Getting any futures that finish within a time limit
link:./examples/dask/Dask-Futures.py[role=include]

Now that you can get the results from futures, you can compare the execution time of dask.delayed versus Dask futures, as shown in Seeing that futures can be faster.

Example 9. Seeing that futures can be faster
link:./examples/dask/Dask-Futures.py[role=include]

In this (albeit contrived) example, you can see how, by starting the work as soon as possible, the future is completed by the time you get the result, whereas the dask.delayed starts only when you get there.

Nested Futures

As with dask.delayed, you can also launch futures from inside futures. The syntax is a bit different, as you need to get an instance of the client object, which is not serializable, so dask.distributed has the special function get_client to get the client inside a distributed function. Once you have the client, you can then launch the future like normal, as shown in Launching a nested future.

Example 10. Launching a nested future
link:./examples/dask/Dask-Futures.py[role=include]

Note that since Dask uses a centralized scheduler, the client is communicating with that centralized scheduler to determine where to place the future.

Distributed Data Structures for Scheduling

Dask also has a collection of data structures to simplify task coordination. These data structures include queues, locks/semaphores, events, and publish/subscriber topics. These distributed data structures aim to behave similarly to their local counterparts, but it’s important to remember that the distributed nature of Dask adds overhead for coordination and remote procedure calls.

For example, Dask’s distributed variable is called Variable, which implements get, set, and delete. The get function takes a timeout, which should remind you that these operations are distributed in nature and therefore are slower than updating a local variable. As with multi-threaded global variables, race conditions can occur when different workers update the same local variable.

Tip

You can name Dask’s distributed data structures when constructing them, and two resources with the same name will be resolved to the same object even if they are constructed separately.

Conclusion

While Dask’s primary building block is dask.delayed, it’s not the only option. You can control more of your execution flow by using Dask’s futures. Futures are ideal for I/O, model inference, and deadline-sensitive applications. In exchange for this additional control, you are responsible for managing the life cycle of your futures and the data they produce in a way that you are not with dask.delayed. Dask also has a number of distributed data structures, including queues, variables, and locks. While these distributed data structures are more expensive than their local counterparts, they also give you another layer of flexibility around controlling your task scheduling.


1. Like writing a file to disk or updating a database record.
2. Although if the only reference to it gets garbage collected, it may not.
3. We believe that this is one of the areas in which Dask has more room for growth, and if you do want to implement a microservice for deadline-critical events, you may want to explore using Dask in conjunction with other systems, like Ray.