Skip to content

Latest commit

 

History

History
181 lines (106 loc) · 19.5 KB

ch01.asciidoc

File metadata and controls

181 lines (106 loc) · 19.5 KB

What is Ray, and where does it fit?

Ray is primarily a Python tool[1] for "Fast and Simple Distributed Computing."

The same lab[2] at University of California Berkeley that created the initial software that eventually became Apache Spark also created the first version of Ray. The researchers from that lab have started a company, Anyscale, to continue developing and offer products and services around Ray.

The goal of Ray is to solve a wider variety of problems than its ancestors, supporting various scalable programing models that range from actors to machine learning to data parallelism. Its remote function and actor models make it a truly general purpose development environment instead of "big data" only.

Ray automatically scales compute resources as needed, allowing you to focus on your code instead of managing servers. Ray can manage and scale cloud resources directly on its own (using ray up) or by using a cluster manager like Kubernetes. In addition to traditional horizontal scaling (e.g., adding more machines) Ray can schedule tasks to take advantage of different machine sizes and accelerators like GPUs.

Since the introduction of AWS Lambda, interest in serverless computing [3] has exploded. Ray provides a great foundation for general purpose serverless platform by providing the following features:

  • It hides servers. Ray autoscaling transparently manages servers based on the application requirements.

  • By providing an actors programming model Ray implements not only stateless (typical for the majority of serverless implementations) but also a stateful programming model.

  • It allows you to specify resources, including hardware accelerators required for the execution of your serverless functions.

  • It supports direct communications between your tasks thus providing support not only for simple functions but also complex distributed applications.

Ray provides a wealth of libraries that are implemented leveraging its serverless capabilities, thus simplifying the creation of applications fully leveraging its capabilities. Normally, you would need different tools for everything from data processing, to workflow management. By using a single tool for a larger portion of your application you simplify not only development but also your operation management.

In this chapter, we’ll look at where Ray fits in the ecosystem and help you decide if it’s a good fit for your project.

Why do you need Ray?

We often need something like Ray when our problems get too big to handle in a single process. Depending on how large our problems get this can mean scaling from multicore to all the way through multicomputer, all of which Ray supports. If you find yourself wondering how you can handle the next month’s growth in users, data, or complexity, our hope is you will take a look at Ray. Ray exists because scaling software is hard, and it tends to be the kind of problem that gets harder rather than simpler with time.

Not only can Ray scale to multiple computers, it also scales without you having to directly manage servers. Leslie Lamport has said, "A distributed system is one in which the failure of a computer you didn’t even know existed can render your own computer unusable." While this kind of failure is still possible, Ray is able to automatically recover from many types of failures.

Ray runs cleanly on your laptop as well as at scale with the same APIs. This provides a very simple starting option for using Ray, that does not require you to go to the cloud to start experimenting with Ray. Once you feel comfortable with the APIs and application structure, you can simply move your code to the cloud for better scalability without the need to modify your code. This fills the needs that exist between a distributed system and a single-threaded application. Ray is able to manage multiple threads and GPUs with the same abstractions it uses for distributed computing.

Where can you run Ray?

Ray can be deployed in a variety of environments ranging from your laptop to the cloud, to cluster managers like Kubernetes or Yarn, to six raspberry pis hidden under your desk.[4] In local mode, getting started can be as simple as a pip install and a call to ray.init() [5].

Ray Cluster

A Ray cluster consists of a head node and a set of worker nodes.

spwr 01in01

As you can see from the figure above, a head node, in addition to supporting all the functionality of the worker node has two additional components:

  • Global control store containing cluster-wide information including object table, task table, function table, events log, etc. The content of this store is used for web UI, error diagnostics, debugging, and profiling tools

  • Autoscaler attempts to launch/terminate worker nodes to ensure that workloads have sufficient resources to run while minimizing the idle resources.

The head node is effectively a master (singleton[6]) which manages a complete cluster (via autoscaler).

Each of the Ray’s nodes contains a Raylet, that consists of two main components:

  • Object store - all of the object stores are connected together and you can think about this collection somewhat similar to memcached - a distributed cache.

  • Scheduler - each Ray node provides a local scheduler that can communicate with each other thus create a unified distributed scheduler for the cluster.

When we are talking about nodes in a Ray cluster we are not talking about physical machines, but rather about logical nodes based on docker images. As a result, when mapping to physical machines, a given physical node can run one or more logical nodes.

ray up, which is included as part of Ray, allows you to create clusters and it will:

  • provision a new instance/machine (if running on the cloud[7] or cluster manager) using the provider’s SDK or accessing machines, if running directly on physical machines

  • execute shell commands to set up Ray with the desired options.

  • run any custom, user defined setup commands, for example, setting environment variables and installing packages.

  • Initialize the Ray cluster.

  • Deploy an autoscaler if required

In addition to ray up, if running on kubernetes you can use Ray kubernetes operator. Although ray up or kubernetes operators are preferred ways of Ray cluster creation, if you have a set of existing machines (either physical or VMs) you can set up the Ray cluster manually.

Regardless of which deployment approach you take, the same Ray code should work everywhere.[8] We’ll look more at running Ray in local mode in the next chapter, and if you want to scale even more we’ll cover deploying to the cloud and resource managers in [appB].

Running your code with Ray

Ray is more than just a library you import, it is also a cluster management tool. In addition to importing the library you need to "connect" to a Ray cluster. You have three options for how to connect your code to a Ray cluster.

  • Calling ray.init() with no arguments. This launches an embedded, single-node Ray instance that is immediately available to the application.

  • Using the Ray client ray.init("ray://<head_node_host>:10001") to connect to a Ray cluster. By default, each Ray cluster launches with a Ray client server running on the head node that can receive remote client connections. Note however that when the client is located remotely, some operations run directly from the client may be slower due to WAN latencies. Ray is not resilient to network failures between the head node and the client.

  • Using The Ray Command Line API - you can use the ray submit command to execute Python scripts on clusters. This will copy the designated file onto the head node cluster and execute it with the given arguments. Note that if you are passing the parameters, your code should use the Python sys module that provides access to any command-line arguments via the sys.argv. This removes the potential networking point of failure when using the ray client.

Where does it fit in the ecosystem?

Ray sits at a unique intersection of problem spaces.

The first problem that Ray solves is that of scaling your Python code by managing resources, be it servers, threads, or GPUs. Ray’s core building blocks are a scheduler, distributed data storage, and actor system. The scheduler that Ray uses is general purpose enough to exist in the space of workflow scheduling, not just with "traditional" problems of scale. Ray’s actor system gives you a simple way of handling resilient distributed execution state.[9]

In addition to the scalable building blocks, Ray has higher-level libraries such as Serve, Data, Tune, RLlib, Train and Workflows that exist in the machine learning problem space. These are designed to be used by folks with more of a data science background than necessarily a distributed systems background.

Overall Ray ecosystem is presented at Ray ecosystem below:

spwr 0101
Figure 1. Ray ecosystem

Let’s take a look at some of the different problem spaces and see how Ray fits in and compares with existing tools.

The following table compares Ray to several related system categories.

Table 1. Comparing Ray to related systems

Cluster Orchestrators

Cluster Orchestrators, like Kubernetes, SLURM, and YARN schedule containers. Ray can leverage these for allocating cluster nodes.

Parallelization Frameworks

Compared to Python parallelization frameworks such as multiprocessing or Celery, Ray offers a more general, higher-performance API. In addition Ray’s distributed objects support data sharing across parallel executors..

Data Processing Frameworks

Ray’s lower-level APIs are more flexible and better suited for a “distributed glue” framework than existing data processing frameworks such as Spark, MARS, or Dask. Although Ray has no inherent understanding of data schemas, relational tables, or streaming dataflow, it supports running many of these data processing frameworks, for example, Modin, Dask-on-Ray, MARS-on-Ray, and RayDP (Spark on Ray).

Actor Frameworks

Unlike specialized actor frameworks such as Erlang, Akka and Orleans Ray integrates Actor framework directly into programming languages. In addition Ray’s distributed objects support data sharing across actors.

Workflows

When most people talk about workflows they talk about UI or script-driven low code development. While this approach might be very useful for non-technical users, they frequently bring more pain than value to software engineers. Ray uses programmatic workflow implementation (compare to Cadence). Implementation combines the flexibility of Ray’s dynamic task graphs with strong durability guarantees. It offers sub-second overheads for task launch and supports workflows with hundreds of thousands of steps. It also takes advantage of the Ray object store to pass distributed datasets between steps.

HPC Systems

Unlike Ray, which exposes tasks and actors APIs, a majority of HPC systems expose lower-level messaging APIs, providing a greater application flexibility. Additionally many of the HPC implementations offer optimized collective communications primitives. Ray provides a Collection Communications library, that implements many of these functionalities.

"Big" Data / Scalable DataFrames

Ray offers a few different APIs for scalable dataframes, a cornerstone of the big data ecosystem. Ray builds on top of the Apache Arrow project to provide a (limited) distributed Dataframe API called ray.data.Dataset. This is largely intended for the simplest of transformations and reading from cloud or distributed storage. Beyond that, Ray also provides support for a more pandas-like experience through DaskOnRay, which leverages the Dask interface on top of Ray

Warning

In addition to the libraries above, you may find references to Mars on Ray or Ray’s (deprecated) built-in pandas support. These libraries do not support distributed mode, so they can limit your scalability.

This is a rapidly evolving area and something to keep your eye on in the future.

Ray and Spark

It is tempting to compare Ray with Apache Spark, and in some abstract ways, they are very similar. From a user’s point of view, Apache Spark is ideal for data-intensive tasks, and Ray is better suited to compute-intensive tasks.

Ray has a lower task overhead and has support for distributed state, making it especially appealing for machine learning tasks. Ray’s lower level APIs make it a more appealing platform to build tools on top of.

Spark has more data tools but depends on centralized scheduling and state management. This centralization makes implementing reinforcement learning and recursive algorithms a challenge. For analytical use cases, especially in existing big data deployments, Spark may be a better choice.

Ray and Spark are complementary and can be used together. A common pattern is data processing with Spark and then machine learning with Ray. In fact, the RayDP library provides you a way to use Spark Dataframes inside of Ray.

We cover scalable dataframes in [ch09].

Machine learning

Ray has multiple machine learning libraries, and for the most part, they serve to delegate much of the fancy parts to existing tools like PyTorch, Scikit-Learn, and Tensorflow while using Ray’s distributed computing facilities to scale. Ray Tune implements hyper-parameter tuning, using Ray’s ability to train many local Python-based models in parallel across a distributed set of machines. Ray Train implements distributed training with PyTorch or Tensorflow. Ray’s RLlib interface offers reinforcement learning with a number of core algorithms.

Part of what allows Ray to stand out from pure data-parallel systems for machine learning is its actor model, which allows easier tracking of “state” - like parameters and inter-worker communication. You can use this to implement your own custom algorithms that are not a part of Ray core.

We cover ML in more detail in [ch10] .

Workflow scheduling

Workflow scheduling is one of these areas which, at first glance, can seem really simple. It’s "just" a graph of work that needs to be done. However, all programs can be expressed as "just" a graph of work that needs to be done. New in 2.0, Ray has a workflow library to simplify expressing both traditional business logic workflows and large-scale (e.g. ML training) workflows.

Ray is unique in workflow scheduling because it allows tasks to schedule other tasks without having to call back to a central node. This allows for greater flexibility and throughput.

If you find Ray’s workflow engine too low-level, you can use Ray to run Apache Airflow. Airflow is one of the more popular workflow scheduling engines in the big data space. The Ray Airflow Provider lets you use your Ray cluster as a worker pool for Airflow.

We cover workflow scheduling in [ch08].

Streaming

Streaming is generally considered to be processing "real-time-ish" data, or data "as-it-arrives-ish." Streaming adds another layer of complexity, especially the closer to real-time you try to get, as not all of your data will always arrive in order or on time. Ray offers some standard streaming primitives and can use Kafka as a streaming data source and sink. Ray uses its actor model APIs to interact with streaming data.

Ray streaming, like many streaming systems bolted on batch systems, has some interesting quirks. Ray streaming, notably, implements more of its logic in Java, unlike the rest of Ray. This can make debugging streaming applications more challenging than other components in Ray.

We cover how to build streaming applications with Ray in [ch06].

Interactive

Not all "real-time-ish" applications are necessarily "streaming" applications. A common example of this is when you are interactively exploring a dataset. Similarly, interacting with user input (e.g. serving models) can be considered interactive rather than batch, but it is handled separately from the streaming libraries with "Ray Serve."

What Ray is not

While Ray is a general-purpose distributed system, it’s important to note there are some things Ray is not (although of course, you could make it be, but you may not want to).

  • SQL / Analytics Engine

  • Data Storage system

  • Suitable for running nuclear reactors

  • Fully Language Independent

In all of these cases Ray can be used to do a bit of them, but you’re likely better off using more specialized tooling. For example, while Ray does have a key/value store, it isn’t designed to survive the loss of the leader node. This doesn’t mean that if you find yourself working on a problem that needs a bit of SQL, or some non-Python libraries, Ray can not meet your needs – just you may need to bring in additional tools.

Conclusion

Ray has the potential to greatly simplify your development and operational overhead for medium-to-large scale problems. It achieves this by offering a unified API across a variety of traditionally separate problems while providing serverless scalability. If you have problems spanning the domains that Ray serves, or just are tired of the operational overhead of managing your own clusters, we hope you’ll join us on the adventure of learning Ray. In the next chapter, we’ll show you how to get Ray installed in local mode on your machine, and will look at a few different hello-worlds from some of the ecosystems that Ray supports (actors, big-data, etc.).


1. You can also use Ray from Java. Like many Python applications, under the hood there is a lot C++ & some Fortran. Ray streaming also has some Java components.
2. not exactly the same, but the subsequent iteration of. It’s name is the RISE Lab https://rise.cs.berkeley.edu
3. a cloud computing model in which the cloud provider allocates machine resources on demand, taking care of the servers on behalf of their customers
4. ARM support, including for PIs and native M1s, requires manual building for now
5. Much of modern ray will automatically initialize a context if one is not present, allowing you to skip even this part.
6. Unfortunately a head node is also a single point of failure. If you lose a head node, you will use the cluster and need to recreate it. Moreover if you lose a head node, existing worker nodes can become orphants and will have to be removed “manually”.
7. Ray currently supports AWS, Azure and GCP
8. With large variances in speed. This can get more complicated when you need specific libraries or hardware for code, for example.
9. For those of you familiar, this is in the space of "reactive systems"