Skip to content

Commit 0cafd16

Browse files
authored
add custom deco docs (#150)
* add custom deco docs * fix links * fix a typo
1 parent 46ee039 commit 0cafd16

File tree

12 files changed

+981
-0
lines changed

12 files changed

+981
-0
lines changed

docs/index.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ Metaflow makes it easy to build and manage real-life data science, AI, and ML pr
3434
- [Debugging Flows](metaflow/debugging)
3535
- [Visualizing Results](metaflow/visualizing-results/)
3636
- [Configuring Flows](metaflow/configuring-flows/introduction)
37+
- [Composing Flows with Custom Decorators](metaflow/composing-flows/introduction)*New*
3738

3839
## II. Scaling Flows
3940

Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
2+
# Advanced Custom Decorators
3+
4+
In addition to running logic before and after user code (as shown on
5+
[the previous page](/metaflow/composing-flows/custom-decorators)), a decorator can
6+
override the `@step` code entirely, executing alternative logic in its place.
7+
Or, a decorator can take action if the user code fails.
8+
9+
## Catching failures in the user code
10+
11+
A decorator can catch failures in the user code by wrapping `yield` in a `try`-`except` block. The
12+
following example shows the pattern in action, capturing any exceptions in the user code, and asking ChatGPT for
13+
advice how to fix it. Save the module to `ai_debug.py`:
14+
15+
```python
16+
import os
17+
import inspect
18+
import traceback
19+
20+
from metaflow import user_step_decorator
21+
22+
PROMPT = """
23+
I have a Metaflow step that is defined as follows:
24+
25+
{source}
26+
27+
It raised the following exception:
28+
29+
{stack_trace}
30+
31+
Provide suggestions how to fix it.
32+
"""
33+
34+
@user_step_decorator
35+
def ai_debug(step_name, flow, inputs=None, attributes=None):
36+
source = inspect.getsource(getattr(flow, step_name))
37+
try:
38+
yield
39+
except:
40+
print("❌ Step failed:")
41+
stack_trace = traceback.format_exc()
42+
prompt_gpt(PROMPT.format(source=source, stack_trace=stack_trace))
43+
raise
44+
45+
def prompt_gpt(prompt):
46+
import requests
47+
OPENAI_API_KEY = os.environ.get('OPENAI_API_KEY')
48+
if OPENAI_API_KEY:
49+
print("🧠 Asking AI for help..")
50+
url = "https://api.openai.com/v1/chat/completions"
51+
headers = {
52+
"Authorization": f"Bearer {OPENAI_API_KEY}",
53+
"Content-Type": "application/json"
54+
}
55+
data = {
56+
"model": "gpt-4",
57+
"messages": [{"role": "user", "content": prompt}]
58+
}
59+
response = requests.post(url, headers=headers, json=data)
60+
resp = response.json()["choices"][0]["message"]["content"]
61+
print(f"🧠💡 AI suggestion:\n\n{resp}")
62+
else:
63+
print("Specify OPENAI_API_KEY for debugging help")
64+
```
65+
66+
You can test the decorator e.g. with this flow:
67+
68+
```python
69+
import math
70+
from metaflow import FlowSpec, step
71+
72+
from ai_debug import ai_debug
73+
74+
class FailFlow(FlowSpec):
75+
76+
@ai_debug
77+
@step
78+
def start(self):
79+
x = 3
80+
for i in range(5):
81+
math.sqrt(x - i)
82+
self.next(self.end)
83+
84+
@step
85+
def end(self):
86+
pass
87+
88+
if __name__ == '__main__':
89+
FailFlow()
90+
```
91+
92+
Set your OpenAI API key in an environment variable `OPENAI_API_KEY` and run the flow. The results are impressive:
93+
94+
```mdx-code-block
95+
import ReactPlayer from 'react-player';
96+
```
97+
98+
<ReactPlayer controls muted playsinline url='/assets/ai_debug.mp4' width='100%' height='100%'/>
99+
100+
## Skipping the user code
101+
102+
A decorator can decide to skip execution of the user code by yielding an empty dictionary, i.e. `yield {}`. Even when
103+
skipping the user code a task is started - to execute the custom decorator - but the task is finished right after the
104+
decorator finishes.
105+
106+
The following example leverages the feature to implement a `@memoize` decorator that reuses past results, skipping
107+
redundant recomputation:
108+
109+
```python
110+
import os
111+
from metaflow import Flow, user_step_decorator, current
112+
113+
@user_step_decorator
114+
def memoize(step_name, flow, inputs=None, attributes=None):
115+
artifact = attributes['artifact']
116+
reset = attributes.get('reset')
117+
if reset and getattr(flow, reset, False):
118+
print("⚙️ memoized results disabled - running the step")
119+
yield
120+
else:
121+
try:
122+
run = Flow(current.flow_name).latest_successful_run
123+
previous_value = run[step_name].task[artifact].data
124+
except:
125+
print("⚙️ previous results not found - running the step")
126+
yield
127+
else:
128+
print(f"✅ reusing results from a previous run {run.id}")
129+
setattr(flow, artifact, previous_value)
130+
yield {}
131+
```
132+
133+
Note that `Flow` adheres to [Metaflow namespaces](/scaling/tagging), so `@memoize` can be used safely by many
134+
concurrent users and production runs, without intermixing results.
135+
136+
The following flow utilizes `@memoize` to skip downloading of data and recomputation of taxi fares in the
137+
`compute_fare` step:
138+
139+
```python
140+
from metaflow import FlowSpec, step, Parameter, pypi
141+
142+
from memoize import memoize
143+
144+
URL = 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2020-01.parquet'
145+
146+
class ComputeTotalFare(FlowSpec):
147+
148+
reset = Parameter('reset', default=False, is_flag=True)
149+
url = Parameter('url', default=URL)
150+
151+
@step
152+
def start(self):
153+
self.next(self.compute_fare)
154+
155+
@memoize(artifact='total_fare', reset='reset')
156+
@pypi(packages={'duckdb': '1.3.2'})
157+
@step
158+
def compute_fare(self):
159+
import duckdb
160+
SQL = f"SELECT SUM(fare_amount) AS total_fare FROM '{self.url}'"
161+
self.total_fare = duckdb.query(SQL).fetchone()[0]
162+
self.next(self.end)
163+
164+
@step
165+
def end(self):
166+
print(f"Total taxi fares: ${self.total_fare}")
167+
168+
if __name__ == '__main__':
169+
ComputeTotalFare()
170+
```
171+
172+
You can use the `--reset` flag to force recomputation of results.
173+
174+
## Replacing the user code
175+
176+
A decorator may decide to execute another function instead of the step function defined in the flow - just
177+
`yield` a callable that takes a `FlowSpec` object (`self` in steps) as an argument.
178+
179+
The following example implements a `@fallback` decorator that first attempts to run the user code and if it
180+
fails - `current.retry_count > 0` - it executes a fallback function instead of re-executing the user code.
181+
182+
```python
183+
from metaflow import user_step_decorator, current
184+
185+
@user_step_decorator
186+
def fallback(step_name, flow, inputs=None, attributes=None):
187+
def _fallback_step(self):
188+
print("🛟 step failed: executing a fallback")
189+
var = attributes.get('indicator')
190+
if var:
191+
setattr(self, var, True)
192+
193+
if current.retry_count == 0:
194+
yield
195+
else:
196+
yield _fallback_step
197+
```
198+
199+
If you pass an attribute `indicator` to the decorator, it stores a corresponding artifact indicating that the
200+
step failed. You can test the decorator with the `FailFlow` above. Note that you need to apply [the `@retry`
201+
decorator](/scaling/failures#retrying-tasks-with-the-retry-decorator) to enable retries:
202+
203+
```
204+
python failflow.py run --with retry --with fallback.fallback:indicator=failed
205+
```
206+
207+
:::info
208+
The fallback function cannot modify the flow’s control logic - it cannot change the target of
209+
a `self.next` call. The overall flow structure remains unchanged, even when a fallback
210+
function is used.
211+
:::
212+
213+
214+
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
2+
# The BaseFlow Pattern
3+
4+
The previous sections introduced custom decorators and mutators, which let
5+
you compose flows from reusable components. Production-grade ML and AI projects
6+
often consist of many such components for data access and transformation,
7+
quality checks, model training and inference, and publishing results -
8+
amongst other needs.
9+
10+
It’s beneficial to let end users focus on developing and iterating on
11+
domain-specific logic, while minimizing visible boilerplate and project
12+
scaffolding. This is where *the BaseFlow pattern* helps: It provides a
13+
common foundation that bundles all necessary components, making them
14+
readily available to the user.
15+
16+
## Defining a base flow
17+
18+
A BaseFlow is a class that inherits from `FlowSpec` and serves as a
19+
foundation for other flows in a project. It can define shared components
20+
such as flow mutators, `Config`s, `Parameter`s, and helper methods, but not steps
21+
themselves. Individual flows in the project then inherit from `BaseFlow`,
22+
automatically gaining access to the common functionality and ensuring consistency
23+
across flows.
24+
25+
A common feature of the BaseFlow pattern is a common configuration file that governs
26+
all top-level concerns of the project. For the following example, we can define a
27+
`project.toml`:
28+
29+
```toml
30+
name = 'myproject'
31+
32+
# run the flow hourly
33+
schedule = "0 * * * *"
34+
35+
[limits]
36+
cpu = 2
37+
memory = 16000
38+
disk = 10000
39+
```
40+
41+
We use the config to set up a base flow:
42+
43+
```python
44+
import tomllib
45+
46+
from metaflow import Config, FlowSpec, project, config_expr, schedule
47+
48+
from flow_linter import flow_linter
49+
50+
def parse_limits(x):
51+
return tomllib.loads(x)['limits']
52+
53+
@flow_linter
54+
@project(name=config_expr('project.name'))
55+
@schedule(cron=config_expr('project.schedule'))
56+
class BaseFlow(FlowSpec):
57+
58+
project_config = Config('project', default='project.toml', parser=tomllib.loads)
59+
limits = Config('limits', default='project.toml', parser=parse_limits)
60+
61+
def number_of_rows(self):
62+
return len(self.table)
63+
```
64+
65+
Note the following details:
66+
67+
- We read `project.toml` as a `Config`, so all its values are available for all derived flows.
68+
69+
- We ensure that all flows use `@flow_linter` which [we
70+
defined previously](/metaflow/composing-flows/mutators#introspecting-a-flow-and-applying-configs),
71+
and use the project config to read `limits` for it.
72+
73+
- We use the config to parametrize `@project` and `@schedule`.
74+
75+
- We define a helper method, `number_of_rows`, which [comes in
76+
handy with `@dataset`](/metaflow/composing-flows/mutators#applying-multiple-decorators-with-a-step-mutator).
77+
78+
Another common pattern is to include metadata, [such as Git
79+
information](/metaflow/configuring-flows/custom-parsers#including-default-configs-in-flows), in flows
80+
automatically. Depending on your needs, your `BaseFlow` can grow arbitrarily feature-rich.
81+
82+
## Using a base flow
83+
84+
Here is an example flow that uses the `BaseFlow` defined above:
85+
86+
```python
87+
from baseflow import BaseFlow
88+
from metaflow import step, Config, current, resources
89+
90+
from dataset import dataset
91+
92+
class ComposedFlow(BaseFlow):
93+
94+
data_config = Config('dataset', default='dataset.json')
95+
96+
@resources(cpu=2)
97+
@dataset(url=data_config.url)
98+
@step
99+
def start(self):
100+
print(f"Project {current.project_name}")
101+
print("Number of rows:", self.number_of_rows())
102+
self.next(self.end)
103+
104+
@step
105+
def end(self):
106+
pass
107+
108+
if __name__ == '__main__':
109+
ComposedFlow()
110+
```
111+
112+
Thanks to `BaseFlow`, derived flows remain clean and minimal, despite including rich functionality under the hood, such as `@project`, `@schedule`, and `@flow_linter`. Shared helper methods also make it easy to equip all derived flows with common utilities, like `number_of_rows` in the example above.
113+
114+
Real-world projects often involve enough complexity and nuance that a single common foundation
115+
can't cover every need. Instead of aiming for perfect, all-encompassing abstractions in `BaseFlow`,
116+
it's better to allow derived flows to customize behavior as needed - such as with flow-specific
117+
`data_config` in the example above.
118+
119+

0 commit comments

Comments
 (0)