Skip to content

Latest commit

 

History

History
815 lines (610 loc) · 37.2 KB

ch08.asciidoc

File metadata and controls

815 lines (610 loc) · 37.2 KB

Ray Workflows

With contributions from Carlos Andrade Costa

Real-life and modern applications in a wide range of domains are often a combination of multiple interdependent steps. For example, in AI/ML workflows, training workloads require multiple steps for data cleaning, balancing and augmentation, while model serving often includes many sub-tasks and integration with long-running business processes. Different steps in the workflows can depend on multiple upstreams and sometimes require different scaling tools.

Computer libraries for workflow management date back over 25 years, with new tools, focused on AI/ML emerging. Workflow specifications range from graphical user interfaces to custom formats, YAML/XML, and libraries in full-fledged programming languages. Specifying workflows in code allows you to use general programming tools, like source control for versioning and collaboration.

In this chapter you will learn the basics of Ray’s Workflow implementation and some simple examples of its usage

What is Ray Workflows?

Ray Workflows extends Ray Core by adding workflow primitives, providing support for programmatic workflow execution with a shared interface with tasks and actors. This allows you to use Ray’s core primitives as part of your workflow’s steps. Ray Workflows is targeted at supporting both traditional machine learning and data workflows (e.g. data pre-processing and training) and long-running business workflows, including model serving integration. It leverages Ray tasks for execution to provide scalability and reliability. Ray’s workflow primitives greatly reduce the burden of embedding workflow logic into application steps.

Why is it different from other solutions?

Unlike other popular workflow frameworks (e.g. Apache Airflow, Kubeflow Pipelines, and others), which focus on tools integration and deployment orchestration, Ray Workflows focuses on lower-level workflow primitives enabling programmatic workflows (the approach was originally introduced by cadence workflow[1]). This programmatic approach can be considered a lower level compared to other implementations, this low-level approach allows for unique workflow management features.

Note

Ray Workflows focuses on embedding core workflow primitives into Ray core to enable rich programmatic workflows, as opposed to supporting tools integration and deployment orchestration.

Ray Workflows Features

In this section, we will walk through the main features of Ray Workflows, review the core primitives, and see how they are used in simple examples.

What are the main features?

  • Durability: By adding virtual actors (see below) Ray Workflows adds durability guarantees to steps executed with Ray’s dynamic task graph.

  • Dependency management: Ray workflows leverages Ray’s runtime environment feature to snapshot the code dependencies of a workflow. This enables management of workflows and Virtual actors as code is upgraded over time.

  • Low-latency and scale: By leveraging Ray’s zero-copy overhead with Plasma[2], Ray Workflow provides sub-second overhead when launching tasks. Ray’s scalability extends to workflows, allowing you to create workflows with thousands of steps.

Note

Ray Workflows provides durable execution of workflow steps using any of Ray’s distributed libraries, with low-latency and dynamic dependency management.

Workflow primitives

Ray Workflows provides core primitives to build workflows with steps and a Virtual actor. Workflow primitives summarizes the core primitives and basic concepts in Ray workflows.

Table 1. Workflow primitives

Steps

Annotated functions with @workflow.step decorator. Steps are one-time executed when finished successfully, and retried on failure. Steps can be used as arguments for other step futures. To ensure recoverability, steps don’t support the ray.get() and ray.wait() calls.

Objects

Data object stored in Ray Object Store, with refences to these objects being passed into and returned from steps. When initially returned from a step, objects are checkpointed and can be shared with other workflows steps through Ray Object Store.

Workflows

Execution graph created with @Workflow.run() and Workflow.run_async(). The workflow execution is logged to storage once started for durability and can be resumed upon failure on any Ray cluster with access to the storage.

Workflows can also be dynamic, generating new steps in sub-workflows at runtime. Workflows support dynamic looping, nesting, and recursion. You can even dynamically add new steps to your workflow Directed Acyclic Graph (DAG) by returning more workflow steps from a workflow step.

Virtual Actors

Virtual actors are like regular Ray actors, which can hold member states. The main difference is that Virtual actors are backed by durable storage instead of only in process memory, which does not survive cluster restarts or worker failures.

Virtual actors manage long-running business workflows. They save their state into external storage for durability. They also support the launch of sub-workflows from method calls and receive externally triggered events.

You can use Virtual actors to add state to an otherwise stateless workflow.

Events

Workflows can be triggered by timers and external events through pluggable event listeners. Events can also be used as an argument for a step, making the step execution wait until the event is received.

Working with Basic Workflow Concepts

Workflows are built out of various different primitives, and you’ll start with learning how to use steps and objects.

Workflow, steps and objects

The code below shows a simple hello world workflow example, demonstrating how the step, object and workflow primitives work in a simple case.[3]

Example 1. Hello world workflow
import ray
from ray import workflow
from typing import List

# Creating an arbitrary Ray remote function
@ray.remote
def hello():
    return "hello"

# Defining a workflow step that puts an object into the object store
@workflow.step
def words() -> List[ray.ObjectRef]:
    return [hello.remote(), ray.put("world")]

# Defining a step that receives an object
@workflow.step
def concat(words: List[ray.ObjectRef]) -> str:
    return " ".join([ray.get(w) for w in words])

# Creating workflow
workflow.init("tmp/workflow_data")
output: "Workflow[int]" = concat.step(words.step())

# Running workflow
assert output.run(workflow_id="workflow_1") == "hello world"
assert workflow.get_status("workflow_1") == workflow.WorkflowStatus.SUCCESSFUL
assert workflow.get_output("workflow_1") == "hello world"

Similar to Ray tasks and Actors described in [ch03] and [ch04] you can explicitly assign computing resources (e.g., CPU core, GPUs,) to a step with the same arguments as in core Ray: num_cpus, num_gpus, and resources. For example:

Example 2. Adding resources to steps
from ray import workflow
@workflow.step(num_gpus=1)
def train_model() -> Model:
    pass  # This step is assigned a GPU by Ray.

train_model.step().run()

Dynamic workflows

In addition to the workflows with the predefined DAG. Ray allows you to create steps programmatically based on the current state of workflow execution - Dynamic workflows. You can use this type of workflow, for example, to implement recursion and more complex execution flows. A simple recursion can be illustrated with a recursive factorial program. The example below[4] shows how you can use recursion within a workflow (note that this is for illustration only and that other implementations with better performance exist without the need of Ray Workflows):

Example 3. Dynamic workflow
from ray import workflow

@workflow.step
def factorial(n: int) -> int:
    if n == 1:
        return 1
    else:
        return mult.step(n, factorial.step(n - 1))

@workflow.step
def mult(a: int, b: int) -> int:
    return a * b

# Calculate the factorial of 5 by creating a recursion of 5 steps
factorial_workflow = factorial.step(5).run()
assert factorial_workflow.run() == 120

Virtual actors

Virtual actors are Ray actors (see [ch04]), backed by durable storage instead of memory and are created with the decorator @virtual_actor. The example below[5] shows how to use a persistent Virtual actor to implement a counter:

Example 4. Virtual actors
from ray import workflow

@workflow.virtual_actor
class counter:
    def __init__(self):
        self.count = 0

    def incr(self):
        self.count += 1
        return self.count

workflow.init(storage="/tmp/workflows")


workflow1 = counter.get_or_create("counter_workflw")
assert c1.incr.run() == 1
assert c1.incr.run() == 2
Warning

Because a Virtual actor retrieves and stores its state before and after every step of execution, its state either has to be JSON serializable (in the form of state dictionary) or __getstate__ and __setstate__ methods should be provided, that convert the actor’s state to and from JSON serializable dictionary.

Workflows in Real Life

Let us take a look at the common steps for creating and managing a reference use case implementation with Ray workflows.

Building workflows

As seen before, you start with implementing individual workflow steps and declaring them with the @worfklow.step annotation. Similarly to a Ray task, steps can receive one or more inputs, where each input can be a specific value or a future - result of execution of one or more previous workflow steps. The return type of workflow is Workflow[T] and is a future with the value available after the execution of the workflow is completed. This process is illustrated in the code example below[6]. In this case, the steps get_value1() and get_value2() return futures which are passed to the sum step function.

Example 5. Implementing workflow step
from ray import workflow

@workflow.step
def sum(x: int, y: int, z: int) -> int:
    return x + y + z

@workflow.step
def get_value1() -> int:
    return 100

@workflow.step
def get_value2(x: int) -> int:
    return 10*x

sum_workflow = sum.step(get_val1.step(), get_val2.step(10), 100)

assert sum_workflow.run("sum_example") == 300

In order to simplify accessing step execution result and passing data between steps, Ray workflows allow you to explicitly name the steps. With this you can, for example, retrieve the results of step execution by calling workflow.get_output(workflow_id, name=”step_name”) which will return an ObjectRef[T]. If you do not explicitly name the step step, Ray will automatically generate one in the format of <WORKFLOW_ID>.<MODULE_NAME>.<FUNC_NAME>.

Note that you can call ray.get() on the returned reference, which will block until the workflow is completed. For example, ray.get(workflow.get_output("sum_example")) == 100.

Steps can be named in two different ways:

  • Using the .options(name=”step_name”)

  • Using the decorator @worfklows.step(name=”step_name”)

Managing workflows

Each workflow in Ray Workflows has a unique workflow_id. You can explicitly set a workflow ID during workflow startup, using .run(workflow_id=”workflow_id”). Same option is also applicable to .run_async(). If no ID is provided when calling .run() and run_async(), a random ID is generated.

Once created, workflows can be in the states shown in Workflow states.

Table 2. Workflow states

Running

Currently running in the cluster

Failed

Failed with an application error. It may be resumed from the failed step

Resumable

Worfklow that failed with a system error and can be resumed from the failed step

Canceled

Workflow has been canceled. It cannot be resumed and results are unavailable

Successful

Workflow completed successfully

Workflow management APIs shows a summary of the management APIs and how you can use them to manage workflows both individually or in bulk.

Table 3. Workflow management APIs
Single workflow Action Bulk Workflow Action

.get_status(workflow_id=<>)

Get status of workflows (running, resumable, failed, canceled, successful)

.list_all(<workflow_state1, workflow_state2, …>)

List all workflow in the states listed

.resume(workflow_id=<>)

Resume a workflow

.resume_all()

Resume all resumable workflows

.cancel(workflow_id=<>)

Cancel a workflow

.delete(workflow_id=<>)

Delete a workflow

Ray Workflows stores workflow information in your configured storage location. You configure the location either when creating the workflow with the decorator workflow.init(storage=<path>), or by setting the environment variable RAY_WORKFLOW_STORAGE.

You can use either regular/local or in a distributed storage using a S3 compatible API:

  • Local filesystem: either single node, for testing purposes only, or through a shared filesystem (e.g., NFS mount) across the nodes in the cluster. Location is passed as an absolute path.

  • S3 backend: Enable workflow data to be written to an S3 based backend for use in production.

If you do not specify a path, workflows will use the default location: /tmp/ray/workflow_data.

Warning

If no storage data location is specified, workflow data is saved locally and only works for a single-node Ray cluster.

Ray’s Workflow dependencies is actively under development. Once available, this feature will allow Ray to log the full runtime environment to storage, at the workflow submission time. By tracking this information, Ray can ensure the workflow can run on a different cluster.

Building a dynamic workflow

As mentioned before, you can create workflows dynamically by creating steps based on the current state of a given step. When such step is created, it is inserted into the original workflow DAG. The workflow below[7] shows an example of how to use a dynamic workflow to calculate the Fibonacci sequence:

Example 6. Dynamic workflow
from ray import workflow

@workflow.step
def add(a: int, b: int) -> int:
    return a + b

@workflow.step
def fib(n: int) -> int:
    if n <= 1:
        return n
    return add.step(fib.step(n - 1), fib.step(n - 2))

assert fib.step(10).run() == 55

Workflows with conditional steps

Workflows with conditional steps are central to many use cases. The example below[8] shows a simplified scenario of the workflow implementing a trip booking:

Example 7. Flight booking example
from ray import workflow

@workflow.step
def book_flight(...) -> Flight: ...

@workflow.step
def book_hotel(...) -> Hotel: ...

@workflow.step
def finalize_or_cancel(
    flights: List[Flight],
    hotels: List[Hotel]) -> Receipt: ...

@workflow.step
def book_trip(origin: str, dest: str, dates) ->
        "Workflow[Receipt]":
    # Note that the workflow engine will not begin executing
    # child workflows until the parent step returns.
    # This avoids step overlap and ensures recoverability.
    f1: Workflow = book_flight.step(origin, dest, dates[0])
    f2: Workflow = book_flight.step(dest, origin, dates[1])
    hotel: Workflow = book_hotel.step(dest, dates)
    return finalize_or_cancel.step([f1, f2], [hotel])

fut = book_trip.step("OAK", "SAN", ["6/12", "7/5"])
fut.run()  # returns Receipt(...)

Handling exceptions

You can choose to have Ray handle exceptions in one of two ways:

  • Automatic retry, until a maximum number of retries is reached

  • Catch and handle exception

You configure this in either the step decorator or via .options(). You specify the settings for the two techniques above, respectively, as follows:

  • max_retries: making the step to be retried upon failure until max_retries is reached. max_retries default is 3.

  • Catch exceptions: When true, this option will make the return value of the function to be converted to a Tuple[Optional[T], Optional[Exception]].

You can also pass these to the workflow.step() decorator.

The code below[9] shows an example of exception handling with these options:

Example 8. Exception handling
from ray import workflow
@workflow.step
def random_failure() -> str:
    if random.random() > 0.95:
        raise RuntimeError("Found failure")
    return "OK"

# Run 5 times before giging up
s1 = faulty_function.options(max_retries=5).step()
s1.run()

@workflow.step
def handle_errors(result: Tuple[str, Exception]):
    # Setting the exception field to NONE on success
    err = result[1]
    if err:
        return "There was an error: {}".format(err)
    else:
        return "OK"

# `handle_errors` receives a tuple of (result, exception).
s2 = faulty_function.options(catch_exceptions=True).step()
handle_errors.step(s2).run()

Handling durability guarantees

Tip

Ray Workflows ensures that once a step succeeds, it will never be re-executed. To enforce this guarantee, Ray Workflows logs the step result to a durable storage, ensuring that results from previous successful steps will not change when used in subsequent steps.

Ray’s workflows go beyond the durability of retrying within a cluster or single application. Workflows implements a failure model based on two statuses:

  • Cluster failure. If the cluster fails, any workflow running on the cluster are set to RESUMABLE state. Workflows that are in RESUMABLE state can be resumed on a different cluster. This can be done with ray.workflow.resume.all(). This will resume all resumable workflow jobs.

  • Driver failure. In this case the workflow will transition to the failed state and once the issue is resolved, it can be resumed from the failed step.

Warning

This is a beta API at the moment of writing and may change before becoming stable.

You can durability guarantees to create idempotent workflows that include steps that have side-effects. This is needed because a step can fail before its output is logged. The example below[10] shows how to use the durability guarantee to make a workflow idempotent:

Example 9. Idempotent workflow
from ray import workflow

@workflow.step
def generate_id() -> str:
   # Generate a unique idempotency token.
   return uuid.uuid4().hex

@workflow.step
def book_flight_idempotent(request_id: str) -> FlightTicket:
   if service.has_ticket(request_id):
       # Retrieve the previously created ticket.
       return service.get_ticket(request_id)
   return service.book_flight(request_id)

# SAFE: book_flight is written to be idempotent
request_id = generate_id.step()
book_flight_idempotent.step(request_id).run()

Extending Dynamic workflow with virtual actors

Virtual actors, described above, also allow sub-workflows to be called from each of their methods.

When you create a Virtual actor, Ray stores its initial state and class definition in the durable storage. As a workflow name used in the actor’s definition Ray stores it in the durable storage. When the actor’s method creates new steps they are dynamically appended to the workflow and executed. In this case both the step definition and its result are stored in the actor’s state. To retrieve the actor, you can use the decorator .get_actor(workflow_id="workflow_id").

You can also define workflows as read only. Because they don’t require logging, they incur less overhead. Additionally, because they don’t imply conflict issues with mutating methods in the actor, Ray can execute them concurrently.

The code below shows[11] an example of how Virtual actors can be used to manage state in a workflow:

Example 10. Virtual actors
from ray import workflow
import ray

@workflow.virtual_actor
class Counter:
    def __init__(self, init_val):
        self._val = init_val

    def incr(self, val=1):
        self._val += val
        print(self._val)

    @workflow.virtual_actor.readonly
    def value(self):
        return self._val

workflow.init()

# Initialize a Counter actor with id="my_counter".
counter = Counter.get_or_create("my_counter", 0)

# Similar to workflow steps, actor methods support:
# - `run()`, which will return the value
# - `run_async()`, which will return a ObjectRef
counter.incr.run(10)
assert counter.value.run() == 10

# Non-blocking execution.
counter.incr.run_async(10)
counter.incr.run(10)
assert 30 == ray.get(counter.value.run_async())

Virtual actors can also create sub-workflows that involve other methods in the Virtual actor or steps defined outside the actor class to be invoked. This means that a workflow can be launched inside a method or it can also be passed to another method. The code below shows an example:[12]

Example 11. Using sub workflows
from ray import workflow
import ray

@workflow.step
def double(s):
    return 2 * s

@workflow.virtual_actor
class Actor:
    def __init__(self):
        self.val = 1

    def double(self, update):
        step = double.step(self.val)
        if not update:
            # inside the method, a workflow can be launched
            return step
        else:
            # workflow can also be passed to another method
            return self.update.step(step)

    def update(self, v):
        self.val = v
        return self.val


handler = Actor.get_or_create("actor")
assert handler.double.run(False) == 2
assert handler.double.run(False) == 2
assert handler.double.run(True) == 2
assert handler.double.run(True) == 4

Virtual actors can also be used for sharing data between multiple workflows (even running on different Ray clusters). For example, Virtual actors may be used to store fitted parameters in a machine learning model such as a Python scikit-learn pipeline. The example below[13] illustrates a simple two-stage pipeline consisting of a standard scalar followed by a decision tree classifier. Each stage is implemented as a workflow step, directly invoking an instance of a Virtual actor defined in the class estimator_virtual_actor. Its member estimator is using __getstate__ and __setstate__ methods to convert its state to and from JSON serializable dictionary. The pipeline is trained when the 3rd input parameter of the input tuple is specified as 'fit' and the pipeline is used for prediction when that parameter is specified as 'predict'.

To train a pipeline, the workflow execution submits a training_tuple to the standard scalar, whose output is then piped through the classification model to train.

training_tuple = (X_train, y_train, 'fit')
classification.step(scaling.step(training_tuple, 'standardscalar'), 'decisiontree').run('training_pipeline')

To use the trained pipeline for prediction, the workflow execution submits a predict_tuple to the same chain of steps, although its 'predict' parameter invokes the predict() function in the Virtual actor. Prediction result is returned as another tuple with labels found in pred_y.

predict_tuple = (X_test, y_test, 'predict')
(X, pred_y, mode) = classification.step(scaling.step(predict_tuple, 'standardscalar'), 'decisiontree').run('prediction_pipeline')

The power of the workflow Virtual actor is to make the trained model available to another Ray cluster. Furthermore, machine learning workflow backed by a Virtual actor can incrementally update its state, such as recalculated time series features. This makes it easier to implement stateful time series analysis, including forecasting, prediction and anomaly detection.

Example 12. Machine learning workflow
import ray
from ray import workflow

import pandas as pd
import numpy as np
from sklearn import base
from sklearn.base import BaseEstimator
from sklearn.preprocessing import StandardScaler
from sklearn.tree import DecisionTreeClassifier
from sklearn.model_selection import train_test_split

ray.init(address='auto')
workflow.init()

@ray.workflow.virtual_actor
class estimator_virtual_actor():
    def __init__(self, estimator: BaseEstimator):
        if estimator is not None:
            self.estimator = estimator

    def fit(self, inputtuple):
        (X, y, mode)= inputtuple
        if base.is_classifier(self.estimator) or base.is_regressor(self.estimator):
            self.estimator.fit(X, y)
            return X, y, mode
        else:
            X = self.estimator.fit_transform(X)
            return X, y, mode

    @workflow.virtual_actor.readonly
    def predict(self, inputtuple):
        (X, y, mode) = inputtuple
        if base.is_classifier(self.estimator) or base.is_regressor(self.estimator):
            pred_y = self.estimator.predict(X)
            return X, pred_y, mode
        else:
            X = self.estimator.transform(X)
            return X, y, mode

    def run_workflow_step(self, inputtuple):
        (X, y, mode) = inputtuple
        if mode == 'fit':
            return self.fit(inputtuple)
        elif mode == 'predict':
            return self.predict(inputtuple)

    def __getstate__(self):
        return self.estimator

    def __setstate__(self, estimator):
        self.estimator = estimator

## prepare the data
X = pd.DataFrame(np.random.randint(0,100,size=(10000, 4)), columns=list('ABCD'))
y = pd.DataFrame(np.random.randint(0,2,size=(10000, 1)), columns=['Label'])

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

@workflow.step
def scaling(inputtuple, name):
    va = estimator_virtual_actor.get_or_create(name, StandardScaler())
    outputtuple = va.run_workflow_step.run_async(inputtuple)
    return outputtuple

@workflow.step
def classification(inputtuple, name):
    va = estimator_virtual_actor.get_or_create(name, DecisionTreeClassifier(max_depth=3))
    outputtuple = va.run_workflow_step.run_async(inputtuple)
    return outputtuple

training_tuple = (X_train, y_train, 'fit')
classification.step(scaling.step(training_tuple, 'standardscalar'), 'decisiontree').run('training_pipeline')

predict_tuple = (X_test, y_test, 'predict')
(X, pred_y, mode) = classification.step(scaling.step(predict_tuple, 'standardscalar'), 'decisiontree').run('prediction_pipeline')
assert pred_y.shape[0] == 2000

Long running workflows require special attention when used as sub-workflows, since sub-workflows block future actor calls when running. To properly handle long running workflow, it is recommended to use Workflows API to monitor its execution and run separate workflows, with deterministic names. This approach prevents that duplicate workflow will be launched in the case of a failure.

Warning

Sub-workflows block future actor method calls. It is not recommended to run a long running workflow as a sub-workflow of a Virtual actor.

Below is an example[14] of how to run a long running workflow without blocking:

Example 13. Non blocking workflow
from ray import workflow
import ray

@workflow.virtual_actor
class ShoppingCart:
    ...
    # check status via ``self.shipment_workflow_id`` for avoid blocking
    def do_checkout():
        # Deterministically generate a workflow id for idempotency.
        self.shipment_workflow_id = "ship_{}".format(self.order_id)
        # Run shipping workflow as a separate async workflow.
        ship_items.step(self.items).run_async(
            workflow_id=self.shipment_workflow_id)

Integrating Workflows with other Ray primitives

Ray workflows can be used with core Ray’s primitives. Here we will describe some common scenarios where Workflows API are integrated with a common Ray program. There are two main scenarios when integrating Workflows with tasks and actors:

  • Running a workflow from within a Ray task or actor

  • Using a Ray task or actor within a Workflow step

Another common case is passing object references between steps in a workflow. Ray object references can be passed as arguments and returned from any workflow step. The code below shows an example[15]:

Example 14. Using object references
from ray import workflow

@ray.remote
def do_add(a, b):
    return a + b

@workflow.step
def add(a, b):
    return do_add.remote(a, b)

add.step(ray.put(10), ray.put(20)).run() == 30

To ensure recoverability Ray workflows logs the contents to persistent storage. Thankfully, when passed to multiple different steps, Ray will not checkpoint the object more than once.

Warning

Ray actor handlers cannot be passed between steps.

Another consideration for you to integrate actors and tasks with Workflows is handling of the nested arguments. As described before, Workflow outputs are fully resolved when passed to a step, as a form to guarantee that all the ancestors of a step are executed before the current step is executed. The example[16] below illustrates this behavior:

Example 15. Using output arguments
import ray
from ray import workflow
from typing import List

@workflow.step
def add(values: List[int]) -> int:
    return sum(values)

@workflow.step
def get_val() -> int:
    return 10

ret = add.step([get_val.step() for _ in range(3)])
assert ret.run() == 30

Triggering Workflows (Connecting to events)

Workflows has a pluggable event system, allowing external events to trigger workflows. This framework provides an efficient built-in wait mechanism and guarantee of exactly-once event delivery semantics. This implies that the user doesn’t need to implement a trigger mechanism based on a running workflow step to react to an event. Like with the rest of workflows, for fault-tolerance, events are checkpointed upon occurrence.

Workflow events can be seen as a type of workflow step that completes only when the event occurs. The decorator .wait_for_event is used to create an event step.

The simple example[17] below shows a workflow step that finishes after 90 seconds and triggers the execution for an outer workflow:

Example 16. Using events
from ray import workflow
import time

# Create an event which finishes after 60 seconds.
event1_step = workflow.wait_for_event(workflow.event_listener.TimerListener, time.time() + 60)

# Create another event which finishes after 30 seconds.
event2_step = workflow.wait_for_event(workflow.event_listener.TimerListener, time.time() + 30)

@workflow.step
def gather(*args):
    return args;

# Gather will run after 60 seconds, when both event1 and event2 are done.
gather.step(event1_step, event2_step).run()

Events also support customer listeners by subclassing EventListener interface:[18]

Example 17. Custom event listeners
from ray import workflow
class EventListener:
    def __init__(self):
        """Optional constructor. Only the constructor with now arguments will be
          called."""
        pass

    async def poll_for_event(self, *args, **kwargs) -> Event:
        """Should return only when the event is received."""
        raise NotImplementedError

    async def event_checkpointed(self, event: Event) -> None:
        """Optional. Called after an event has been checkpointed and a transaction can
          be safely committed."""
        pass

Workflow Metadata

One of the important requirements for workflow execution is observability. Typically you want not only to see the workflow execution results but also to get the information about the internal states, for example, paths that execution took, execution steps performance, values of variables, etc. Ray’s workflow metadata provides support for both some of the standard and user-defined metadata options. Standard metadata, is split between workflow level metadata:

  • status: workflow states, can be one of RUNNING, FAILED, RESUMABLE, CANCELED, or SUCCESSFUL.

  • user_metadata: a python dictionary of custom metadata by the user via workflow.run().

  • stats: workflow running stats, including workflow start time and end time.

And step level metadata:

  • name: name of the step, either provided by the user via step.options() or generated by the system.

  • step_options: options of the step, either provided by the user via step.options() or default by system.

  • user_metadata: a python dictionary of custom metadata by the user via step.options().

  • stats: step running stats, including step start time and end time.

Ray workflows provides a simple API to obtain standard metadata:

workflow.get_metadata(workflow_id)

To get metadata about workflow and

workflow.get_metadata(workflow_id, name=<step name>)

to get metadata about a step. Both versions of the API return a dictionary containing all of the metadata for either workflow itself or an individual step.

In addition to the standard metadata, you can add custom ones, capturing parameters of interest either in workflow or specific step:

  • Workflow-level metadata can be added via .run(metadata=metadata)

  • Step-level metadata can be added via .options(metadata=metadata) or in the decorator @workflow.step(metadata=metadata)

Finally, you can define expose metadata from the Virtual Actors execution and also retrieve workflow/steps metadata to control execution.

Tip

The metrics that you add to Ray metrics are exposed as prometheus metrics just like Ray’s built in metrics.

Be aware that get_metadata() returns an immediate result at the invocation time, which means that not all fields might be available in the result.

Conclusion

In this chapter you learned how Ray Workflow adds workflows primitives to Ray, allowing you to create dynamic pipelines with rich workflow management support in Ray. Ray workflows allow for creation of common pipelines involving multiple steps, like data pre-processing, training, and long running business workflows. With Ray, the possibility of a programmatic workflow execution engine became feasible with a shared interface with Ray tasks and actors. This capability can greatly reduce the burden of orchestrating workflows and embedding workflow logic into application steps.

This said, be aware that Ray Remote (see ch3) provides basic execution sequencing and fork/merge capabilities based on the arguments availability. As a result for some simple use cases, usage of Ray workflow might seem as an overkill, but if you need execution reliability, restartability, programmatic control and metadata management (which you typically do), Ray workflow is a preferred implementation approach.


1. Cadence consists of a programming framework (or client library) that provides a “fault-oblivious” stateful programming model, allowing developers to create workflows the same way they are writing normal code.
2. Plasma is a shared memory store.
3. Complete code for this example is in Github
4. The full code for this example is in Github
5. The full code for this example is in Github
6. The full code is in Github
7. The complete code is in Github
8. The complete code for this example is in
9. The complete code for this example is in Github
10. The complete code for this example is in Github
11. The complete code for this example is in Github
12. The complete code for this example is in Github
13. The complete code for this example is in Github
14. The complete code for this example is in Github
15. The complete code for this example is in Github
16. The complete code for this example is in Github
17. Complete code for this example is in Github
18. Complete code for this example is in Github