In the previous chapter, you learned about Ray remote functions that are very useful for the parallel execution of stateless functions. But what if you need to maintain a state between invocations? Examples of such situations span from a simple counter to a neural network during training to a simulator environment. One option for maintaining a state in these situations is to return the state along with the result and pass it to the next call. Although technically this will work, this is not the best solution, due to a large amount of data that has to be passed around (especially as the size of the state starts to grow).
Ray uses a concept called actors, which we will cover in this chapter, to manage the state.
Note
|
Much like Ray’s "remote" functions, all Ray Actors are "remote" actors, even when running on the same machine. |
In a nutshell, an actor is a computer process with an address (handle). This means that it can also store things in memory - private to the actor process. Before delving into the details of how to implement and scale Ray actors, let’s first take a look at the concepts behind them. Actors come from the actor model design pattern. Understanding the actor model is key to effectively managing the state and concurrency.
The actor model - is a conceptual model to deal with concurrent computation and was introduced by Carl Hewitt in 1973. The heart of this model is an actor - a universal primitive of concurrent computation with its state.
An actor has a simple job:
-
Store data
-
Receive messages from other actors
-
Pass messages to other actors
-
Create additional child actors
The data that an actor stores is hidden from outside an actor and can be accessed/modified only by the actor itself. To change the actor’s state it is necessary to send messages to the actor that will modify the state. (Compare this to using method calls in object-oriented programming.)
To ensure an actor’s state consistency, actors process one request at a time. This means that all actor method invocations are globally serialized for a given actor. To improve the throughput, people often create a pool of actors (assuming they can shard or replicate the actor’s state).
The actor model is a good fit for many distributed systems scenarios. Here are some typical use cases where usage of the Actor model can be advantageous:
-
You need to deal with a large distributed state that is hard to synchronize between invocations.
-
You want to work with single-threaded objects that do not require significant interaction from external components.
In both of these situations, you would implement the standalone parts of the work inside of an actor. You can put each piece of independent state inside of its own actor, and then any changes to the state come in through the actor. Most actor system implementations avoid concurrency issues by only using single-threaded actors.
Now that you know the general principles of the actor model, let’s take a closer look at Ray’s remote actors.
Ray implements remote actors as a stateful worker. When you create a new remote actor, Ray creates a new worker and schedules the actor’s methods on that worker.
A common example of an actor is a bank account. Let’s take a look at how you can implement an account using Ray remote actors. The creation of a Ray remote actor is as simple as decorating a Python class with the @ray.remote
decorator Implementing Ray remote actor.
link:examples/ray_examples/actors/simple_account.py[role=include]
In both Ray remote functions and actors, you can throw exceptions. This will cause a function/method throwing an exception to return immediately.
In the case of remote actors, after the exception is thrown, the actor will continue running normally.
You can use normal Python exception processing, in this case, to deal with exceptions in the method invoker code (see below).
The Account actor class itself is fairly simple and has four methods:
-
Constructor - creates an account, based on the starting and minimum balance. It also makes sure that the current balance is larger than the minimal one and throws an exception otherwise.
-
Balance - returns the current balance of the account. Because an actor’s state is private to the actor, access to it is only available through the actor’s method.
-
Deposit - deposits an amount to the account and returns a new balance
-
Withdraw - withdraws an amount from the account and returns a new balance. It also ensures that the remaining balance is greater than the predefined minimum balance and throws an exception otherwise.
Now that you have defined the class, you need to use Ray’s .remote
to create an instance of this actor.
link:examples/ray_examples/actors/simple_account.py[role=include]
Actor lifetimes and metadata (e.g., IP address and port) are managed by the global control store (GCS) service, which is currently a single point of failure. We cover the GCS in more detail in the next chapter.
Each client of the actor may cache this metadata and use it to send tasks to the actor directly over gRPC without querying the GCS. When an actor is created in Python, the creating worker first synchronously registers the actor with the GCS. This ensures correctness in case the creating worker fails before the actor can be created. Once the GCS responds, the remainder of the actor creation process is asynchronous. The creating worker process queues locally a special task known as the actor creation task. This is similar to a normal non-actor task, except that its specified resources are acquired for the lifetime of the actor process. The creator asynchronously resolves the dependencies for the actor creation task, then sends it to the GCS service to be scheduled. Meanwhile, the Python call to create the actor immediately returns an “actor handle” that can be used even if the actor creation task has not yet been scheduled.
Actor’s method execution is similar to a remote task invocation - it is submitted directly to the actor process via gRPC, will not run until all ObjectRef
dependencies have been resolved and return futures. Note that no resource allocation is required for actor’s method invocation (it is performed during the actor’s creation), which makes them faster than remote function invocation.
Here account_actor represents an actor handle. These handles play an important role in the actor’s life cycle. Actor processes will be terminated automatically when the initial actor handle goes out of scope in Python (note that in this case the actor’s state is lost).
Tip
|
You can create multiple distinct actors from the same class. They will each have their own independent state. |
Like an ObjectRef, you can pass an actor handle as a parameter to another actor or Ray remote function or a Python code.
Note that Implementing Ray remote actor uses @ray.remote
annotation to define an ordinary Python class as a Ray remote actor. Alternatively, instead of using an annotation, you can leverage the following code to convert a Python class into a remote actor:
link:examples/ray_examples/actors/simple_account.py[role=include]
Once you have a remote actor in place you can invoke it using the following code Invoking remote actor.
print(f"Current balance \{ray.get(account_actor.balance.remote())}")
print(f"New balance \{ray.get(account_actor.withdraw.remote(40.))}")
print(f"New balance \{ray.get(account_actor.deposit.remote(30.))}")
Tip
|
It’s important to handle exceptions, which in the example can occur in both the the deposit and withdrawal methods code. To handle the exceptions you should augment the Invoking remote actor with try/except clauses like bellow: try:
print(f"New balance \{ray.get(account_actor.withdraw.remote(-40.))}")
except Exception as e:
print(f"Oops! \{e} occurred.") This will ensure that the code will intercept all the exceptions thrown by the actor’s code and implement all the necessary actions. |
You can also create named actors using the following:
link:examples/ray_examples/actors/simple_account.py[role=include]
Once the actor has a name you can use this name to obtain the actor’s handle from any place in the code:
ray.get_actor('Account')
As defined above, the default actor’s life cycle is linked to the actor’s handle being in scope.
Actor lifetime can be decoupled from its handle being in scope, allowing an actor to persist even after the driver process exits. You can create a detached actor by specifying the lifetime parameter as detached, as shown below:
link:examples/ray_examples/actors/simple_account.py[role=include]
In theory, you can make an actor detached without specifying its name, but since ray.get_actor
operates by name detached actors make the most sense with a name. You should name your detached actors so you can access them, even after the actor’s handle is out of scope. The detached actor itself can own any other tasks and objects.
In addition, you can manually delete actors from both inside an actor, using ray.actor.exit_actor() or using an actor’s handle ray.kill(account_actor). This can be useful if you know that you do not need specific actors anymore and want to reclaim the resources.
As shown here, it’s fairly easy to create a basic Ray actor and manage its life cycle, but what happens if the Ray node on which the actor is running will go down for some reason[1]? The @ray.remote
annotation allows you to specify two parameters that control behavior in this case:
-
max_restarts which specify the maximum number of times that the actor should be restarted when it dies unexpectedly. The minimum valid value is 0 (default), which indicates that the actor doesn’t need to be restarted. A value of -1 indicates that an actor should be restarted indefinitely.
-
max_task_retries that specify how many times to retry an actor’s task if the task fails due to a system error. If set to -1, the system will retry the failed task until the task succeeds, or the actor has reached its max_restarts limit. If set to n > 0, the system will retry the failed task up to n times, after which the task will throw a RayActorError exception upon ray.get.
As further explained in the next chapter and here when an actor is restarted, Ray will recreate its state by rerunning its constructor. This means that if a state was changed during the actor’s execution, it will be lost. In order to preserve such a state, an actor has to implement its custom persistence.
In our example case, the actor’s state is lost on failure since we haven’t used actor persistence. This might be okay for some use cases, but not acceptable for others (see also this Ray pattern). In the next section, you will learn how to programmatically implement custom actor persistence.
In this implementation, the state is saved as a whole, which works well enough if the size of the state is relatively small and the state changes are relatively rare. Also to keep an example simple a local disk persistence is used. In reality, for a distributed Ray case, you should consider using NFS or S3 or a database to enable access to the actor’s data from any node in the Ray cluster.
Persistent Account actor is presented below[2] in Define a persistent actor, using filesystem persistance.
Because the Actor model defines an actor’s interactions through messages, another common approach to actor’s persistence used in many commercial implementations is event sourcing - persisting a state as a sequence of state-changing events. Such an approach is especially important when the size of the state is large and events are relatively small because it significantly decreases the amount of data saved for every actor’s invocation and consequently improves actors’ performance. This implementation can be arbitrarily complex and include snapshotting, etc.
link:examples/ray_examples/actors/persistent_account.py[role=include]
If we compare this implementation with the original one Implementing Ray remote actor we will notice several important changes:
-
Here the constructor has two additional parameters - account_key and basedir. The account key is a unique identifier for the account that is also used as the name of the persistence file. Basedir is a base directory used for storing persistence files. When the constructor is invoked we first check if there is a persistent state for this account saved and if there is one, we ignore passed in balance and minimum balance and restore them from persistence.
-
Two additional methods are added to the class - store_state and restore_state. Store_state is a method that stores an actor state into a file. State information is represented as a dictionary with keys as names of the state elements and values as the state elements values. We are using Ray’s implementation of cloud pickling[3] to convert this dictionary to the byte string and then write this byte string to the file, defined by the account key and base directory. Restore_state is a method that restores a state from a file defined by an account key and base directory. It reads a binary string from the file and uses Ray’s implementation of cloud pickling to convert it to the dictionary. Then it uses the content of the dictionary to populate the state.
-
Finally, both deposit and withdrawal methods, that are changing the state, use the store_state method to update persistence
The implementation shown in Define a persistent actor, using filesystem persistance works fine, but our account actor implementation now contains too much persistence-specific code and is tightly coupled to file persistence. A better solution is to separate persistence-specific code into a separate class.
We start by defining an abstract class defining methods that have to be implemented by any persistence class Base persistence class:
link:examples/ray_examples/actors/persistent_account_class.py[role=include]
This class defines all the methods that have to be implemented by a concrete persistence implementation. With this in place a file persistence class implementing base persistence can be defined as follows File persistence class:
link:examples/ray_examples/actors/persistent_account_class.py[role=include]
This implementation factors out most of the persistence-specific code from our original implementation Define a persistent actor, using filesystem persistance. Now it is possible to simplify and generalize an account implementation Persistent actor with pluggable persistence:
link:examples/ray_examples/actors/persistent_account_class.py[role=include]
Only the code changes from our original persistent actor implementation (Define a persistent actor, using filesystem persistance) are shown here. Note that the constructor is now taking the BasePersistence class, which allows for easily changing persistence implementation without changing the actor’s code. Additionally, restore_state and savestate methods are generalized to move all of the persistence-specific code to the persistence class.
This implementation is flexible enough to support different persistence implementations, but if a persistence implementation requires permanent connections to a persistence source (for example DB connection), it can become unscalable by simultaneously maintaining too many connections. In this case, we can implement persistence as an additional actor. But this requires scaling of this actor. Let’s take a look at the options that Ray provides for scaling actors.
The original actor model described earlier in this chapter, typically assumes that actors are lightweight, e.g. contain a single piece of state, and they do not require scaling/parallelization. In Ray and similar systems[4], actors are often used for coarser grained[5] implementations and can require scaling.
As with Ray remote functions, you can scale actors both horizontally (across processes/machines) with "pools" or vertically (with more resources). The [ray_resources] section covers how to request more resources, but for now, let’s focus on horizontal scaling.
You can add more processes for actors Horizontal with Ray’s actor pool provided by the ray.util module. This class is similar to a multiprocessing pool and lets you schedule your tasks over a fixed pool of actors.
It effectively uses a fixed set of actors as a single entity and manages which actor in the pool gets the next request. Note that actors in the pool are still individual actors and their state is not merged. So this scaling option only works in the case when an actor’s state is created in the constructor and does not change during the actor’s execution.
Let’s take a look at how to use an actor’s pool to improve the scalability of our account class with persistence actor Using an actor’s pool for implementing persistence.
link:examples/ray_examples/actors/persistent_account_2actors_pool.py[role=include]
Only the code changes from our original implementation are shown here (the complete code is here). The code starts by creating a pool of 3 identical File persistence actors and then this pool is passed to an account implementation.
The syntax of a pool-based execution is a lambda function that takes two parameters - an actor reference and a value to be submitted to the function. The limitation here is that value is a single object. One of the solutions for functions with multiple parameters is to use a tuple that can contain an arbitrary amount of components. The function itself is defined as a remote function on the required actor’s method.
An execution on the pool is asynchronous (it just routes requests to one of the remote actors internally). This allows for the speeding up of execution of the store_state method which does not need the results of data storage. Here implementation is not waiting for results state storage, it just starts the execution. The restore_state method, on another hand, needs the result of pool invocation to proceed. A pool implementation internally manages the process of waiting for execution results to become ready and exposes this functionality through the get_next() function (note that this is a blocking call). Pool’s implementation manages a queue of execution results (in the same order as requests), This means that whenever we need to get a result from the pool it is necessary to first clear out the pool results queue to ensure that we get the right result.
In addition to the multiprocessing-based scaling provided by the actor’s pool, Ray also supports scaling of the actor’s execution through concurrency. Ray offers two types of concurrency within an actor - threading and async execution.
When using concurrency inside actors, keep in mind that Python’s Global Interpreter Lock (GIL) will only allow one thread of Python code running at once. This means that pure Python will not provide true parallelism. On another hand, if you invoke Numpy, Cython, Tensorflow, or PyTorch code, these libraries will release the GIL when calling into C/C++ functions. By overlapping the time waiting for IO or working in native libraries both threading and async actor execution can achieve some parallelism.
AsyncIO can be thought of as cooperative multitasking, where your code or libraries needs to explicitly signal that it is waiting on a result, and Python can go ahead and execute another task by explicitly switching execution context. AsyncIO works by having a single process running through an event loop and changing which task it is executing when a task yields/awaits. AsyncIO tends to be lower overhead than multithreaded execution and can be a little easier to reason about. Ray Actors, but not remote functions, integrate with AsyncIO allowing you to write asynchronous actor methods.
You should use threaded execution when your code spends a lot of time blocking but not yielding control by calling await
. Threads are managed by the operating system deciding when to run which thread. Using threaded execution can involve fewer code changes, as you do not need to be explicit about where your code is yielding. This can also make threaded execution more difficult to reason about.
You need to be careful and selectively use locks when accessing or modifying objects with both threads and AsyncIo. In both approaches, your objects share the same memory. By using locks you ensure that only one thread or task can access the specific memory. Locks have some overhead[6]. As a result, usage of actor’s concurrency is mostly applicable for use cases when a state is populated in a constructor and never changes.
To create an actor that uses AsyncIo, you need to define at least one async method. In this case, Ray will create an AsyncIo event loop for executing the actor’s methods. Submitting tasks to these actors is the same from the caller’s perspective as submitting tasks to a regular actor. The only difference is that when the task is run on the actor, it is posted to an AsyncIo event loop (note that usage of blocking ray.get or ray.wait calls inside async actor method is not allowed, because they will block the execution of the event loop) running in a background thread or thread pool instead of running directly on the main thread.
An example of a very simple async actor is presented below in Simple async actor:
link:examples/ray_examples/actors/async_actor.py[role=include]
Because method computation is defined as async, Ray will create an async actor. Note that unlike ordinary async methods, that require await to invoke them, usage of the Ray async actors does not require any special invocation semantics. Additionally, Ray allows you to specify max concurrency for async actor’s execution during actor’s creation:
actor = AsyncActor.options(max_concurrency=5).remote()
To create a threaded actor you need to specify max_concurrency during actor creation (Simple threaded actor).
@ray.remote
class ThreadedActor:
def computation(self, num):
print(f'Actor waiting for \{num} sec')
for x in range(num):
sleep(1)
print(f'Actor slept for \{x+1} sec')
return num
actor = ThreadedActor.options(max_concurrency=3).remote()
Tip
|
Because both Async and Threaded actors are using max concurrency_, it might be a little confusing what type of actor is created. The thing to remember is that if max_concurrency is used it can be either Async and Threaded actor. If at least one of the actor’s methods is async, then it is an Async actor, otherwise, it is a Threaded one. |
So, which scaling approach should we use for our implementation? This blog post provides a good summary of features for different approaches (Comparison of different actor’s scaling approaches).
Scaling approach | Feature | Usage criteria |
---|---|---|
Actor pool |
Multiple processes, high CPU utilization |
CPU-bound |
Async actor |
Single process, single thread, cooperative multitasking, tasks cooperatively decide on switching. |
Slow IO-bound |
Threaded actor |
Single process, multiple threads, preemptive multitasking, OS decides on task switching. |
Fast IO-bound and non-async libraries you do not control |
Because Ray remote actors are effectively remote functions, all of the Ray remote best practices described in the previous chapter are applicable. In addition, there are some actor-specific best practices.
As mentioned before, Ray offers support for actors’ fault tolerance. Specifically for actors, you can specify max_restarts to automatically enable restart for Ray actors. This means when your actor or the node hosting that actor crashes, the actor will be automatically reconstructed. However, this doesn’t provide ways for you to restore application-level states in your actor. Consider actor persistence approaches, described in this chapter to ensure restoration of execution level states as well.
If in your applications you have global variables that you have to change do not change them in remote functions, rather use actors to encapsulate them and access them through the actor’s methods. This is because remote functions are running in different processes and do not share the same address space and as a result, these changes are not reflected across ray driver and remote functions.
One of the common application use cases is the execution of the same remote function many times for different data sets. Usage of the remote functions directly can cause delays due to the creation of new processes for each of the functions. It can also overwhelm the Ray cluster with a large number of processes. A more controlled option for such a use case is the usage of the Actor’s pool. In this case, a pool provides a controlled set of workers that are readily available (no process creation delay) for execution. As the pool is maintaining its requests queue, the programming model for this option is identical to starting independent remote functions but provides a better-controlled execution environment,
In this chapter, you have learned how to use Ray remote actors to implement stateful execution in Ray. You have learned about the actor model and how to implement Ray remote actors. Note that Ray internally heavily relies on using actors, for example, for multi-node synchronization, streaming (see [ch06]), microservices implementation (see <ch07>>), etc. It is also widely used for ML implementations, see, for example, usage of actors for implementing parameter server.
You have also learned how to improve an actor’s reliability by implementing an actor’s persistence and saw a simple example of persistence implementation.
Finally, you have learned about the options that Ray provides for scaling actors, their implementation, and tradeoffs.
In the next chapter, we will discuss additional Ray’s design details.