Skip to content

Commit 333eeeb

Browse files
talsperresaikonen
andauthored
Add static and runtime dag info, API to fetch ancestor and successor tasks (#2124)
* Add static and runtime dag info, API to fetch ancestor tasks * Add API to get immediate successors * Add API for getting closest siblings * Update metadata API params * Refactor ancestor and successor client code * Remove unneccessary prints * Support querying ancestors and successors in local metadata provider * Refactor and simplify client code * Make query logic more descriptive * Add core tests for ancestor task API * Add core test for immediate successor API * Add endpoint in OSS metadata service * Add logs to tests * Log for each stack to metadata, update query logic * Add more comments to code * Run black formatting * Set monitor to None in filter tasks API * import urlencode * Address comments * Update logic for siblings, make it work for static splits as well * update service url for filter task requests. update query param names. * Fix bug in parsing steps due to different data formats across metadata services * json serialize the ancestry metadata * Address comments * Update docstrings * Remove duplicate code * Address comments * Remove commented out code * Update docstrings * Remove spurious import in core * Update OSS metadata service API call * Remove commented code from parent task tests * Remove spurious function * Remove spurious comment * Address comments * Update tests * Address comments * Update docstrings * Address black comments * Address black comments * Update docstrings, remove duplicate property * Return metadata service version needed for runtime dag apis --------- Co-authored-by: Sakari Ikonen <[email protected]>
1 parent c5925a2 commit 333eeeb

File tree

6 files changed

+527
-3
lines changed

6 files changed

+527
-3
lines changed

metaflow/client/core.py

+173-1
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,7 @@ def __iter__(self) -> Iterator["MetaflowObject"]:
380380
_CLASSES[self._CHILD_CLASS]._NAME,
381381
query_filter,
382382
self._attempt,
383-
*self.path_components
383+
*self.path_components,
384384
)
385385
unfiltered_children = unfiltered_children if unfiltered_children else []
386386
children = filter(
@@ -1191,6 +1191,143 @@ def _iter_filter(self, x):
11911191
# exclude private data artifacts
11921192
return x.id[0] != "_"
11931193

1194+
def _iter_matching_tasks(self, steps, metadata_key, metadata_pattern):
1195+
"""
1196+
Yield tasks from specified steps matching a foreach path pattern.
1197+
1198+
Parameters
1199+
----------
1200+
steps : List[str]
1201+
List of step names to search for tasks
1202+
pattern : str
1203+
Regex pattern to match foreach-indices metadata
1204+
1205+
Returns
1206+
-------
1207+
Iterator[Task]
1208+
Tasks matching the foreach path pattern
1209+
"""
1210+
flow_id, run_id, _, _ = self.path_components
1211+
1212+
for step in steps:
1213+
task_pathspecs = self._metaflow.metadata.filter_tasks_by_metadata(
1214+
flow_id, run_id, step.id, metadata_key, metadata_pattern
1215+
)
1216+
for task_pathspec in task_pathspecs:
1217+
yield Task(pathspec=task_pathspec, _namespace_check=False)
1218+
1219+
@property
1220+
def parent_tasks(self) -> Iterator["Task"]:
1221+
"""
1222+
Yields all parent tasks of the current task if one exists.
1223+
1224+
Yields
1225+
------
1226+
Task
1227+
Parent task of the current task
1228+
1229+
"""
1230+
flow_id, run_id, _, _ = self.path_components
1231+
1232+
steps = list(self.parent.parent_steps)
1233+
if not steps:
1234+
return []
1235+
1236+
current_path = self.metadata_dict.get("foreach-execution-path", "")
1237+
1238+
if len(steps) > 1:
1239+
# Static join - use exact path matching
1240+
pattern = current_path or ".*"
1241+
yield from self._iter_matching_tasks(
1242+
steps, "foreach-execution-path", pattern
1243+
)
1244+
return
1245+
1246+
# Handle single step case
1247+
target_task = Step(
1248+
f"{flow_id}/{run_id}/{steps[0].id}", _namespace_check=False
1249+
).task
1250+
target_path = target_task.metadata_dict.get("foreach-execution-path")
1251+
1252+
if not target_path or not current_path:
1253+
# (Current task, "A:10") and (Parent task, "")
1254+
# Pattern: ".*"
1255+
pattern = ".*"
1256+
else:
1257+
current_depth = len(current_path.split(","))
1258+
target_depth = len(target_path.split(","))
1259+
1260+
if current_depth < target_depth:
1261+
# Foreach join
1262+
# (Current task, "A:10,B:13") and (Parent task, "A:10,B:13,C:21")
1263+
# Pattern: "A:10,B:13,.*"
1264+
pattern = f"{current_path},.*"
1265+
else:
1266+
# Foreach split or linear step
1267+
# Option 1:
1268+
# (Current task, "A:10,B:13,C:21") and (Parent task, "A:10,B:13")
1269+
# Option 2:
1270+
# (Current task, "A:10,B:13") and (Parent task, "A:10,B:13")
1271+
# Pattern: "A:10,B:13"
1272+
pattern = ",".join(current_path.split(",")[:target_depth])
1273+
1274+
yield from self._iter_matching_tasks(steps, "foreach-execution-path", pattern)
1275+
1276+
@property
1277+
def child_tasks(self) -> Iterator["Task"]:
1278+
"""
1279+
Yield all child tasks of the current task if one exists.
1280+
1281+
Yields
1282+
------
1283+
Task
1284+
Child task of the current task
1285+
"""
1286+
flow_id, run_id, _, _ = self.path_components
1287+
steps = list(self.parent.child_steps)
1288+
if not steps:
1289+
return []
1290+
1291+
current_path = self.metadata_dict.get("foreach-execution-path", "")
1292+
1293+
if len(steps) > 1:
1294+
# Static split - use exact path matching
1295+
pattern = current_path or ".*"
1296+
yield from self._iter_matching_tasks(
1297+
steps, "foreach-execution-path", pattern
1298+
)
1299+
return
1300+
1301+
# Handle single step case
1302+
target_task = Step(
1303+
f"{flow_id}/{run_id}/{steps[0].id}", _namespace_check=False
1304+
).task
1305+
target_path = target_task.metadata_dict.get("foreach-execution-path")
1306+
1307+
if not target_path or not current_path:
1308+
# (Current task, "A:10") and (Child task, "")
1309+
# Pattern: ".*"
1310+
pattern = ".*"
1311+
else:
1312+
current_depth = len(current_path.split(","))
1313+
target_depth = len(target_path.split(","))
1314+
1315+
if current_depth < target_depth:
1316+
# Foreach split
1317+
# (Current task, "A:10,B:13") and (Child task, "A:10,B:13,C:21")
1318+
# Pattern: "A:10,B:13,.*"
1319+
pattern = f"{current_path},.*"
1320+
else:
1321+
# Foreach join or linear step
1322+
# Option 1:
1323+
# (Current task, "A:10,B:13,C:21") and (Child task, "A:10,B:13")
1324+
# Option 2:
1325+
# (Current task, "A:10,B:13") and (Child task, "A:10,B:13")
1326+
# Pattern: "A:10,B:13"
1327+
pattern = ",".join(current_path.split(",")[:target_depth])
1328+
1329+
yield from self._iter_matching_tasks(steps, "foreach-execution-path", pattern)
1330+
11941331
@property
11951332
def metadata(self) -> List[Metadata]:
11961333
"""
@@ -1905,6 +2042,41 @@ def environment_info(self) -> Optional[Dict[str, Any]]:
19052042
for t in self:
19062043
return t.environment_info
19072044

2045+
@property
2046+
def parent_steps(self) -> Iterator["Step"]:
2047+
"""
2048+
Yields parent steps for the current step.
2049+
2050+
Yields
2051+
------
2052+
Step
2053+
Parent step
2054+
"""
2055+
graph_info = self.task["_graph_info"].data
2056+
2057+
if self.id != "start":
2058+
flow, run, _ = self.path_components
2059+
for node_name, attributes in graph_info["steps"].items():
2060+
if self.id in attributes["next"]:
2061+
yield Step(f"{flow}/{run}/{node_name}", _namespace_check=False)
2062+
2063+
@property
2064+
def child_steps(self) -> Iterator["Step"]:
2065+
"""
2066+
Yields child steps for the current step.
2067+
2068+
Yields
2069+
------
2070+
Step
2071+
Child step
2072+
"""
2073+
graph_info = self.task["_graph_info"].data
2074+
2075+
if self.id != "end":
2076+
flow, run, _ = self.path_components
2077+
for next_step in graph_info["steps"][self.id]["next"]:
2078+
yield Step(f"{flow}/{run}/{next_step}", _namespace_check=False)
2079+
19082080

19092081
class Run(MetaflowObject):
19102082
"""

metaflow/metadata_provider/metadata.py

+33
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from collections import namedtuple
66
from itertools import chain
77

8+
from typing import List
89
from metaflow.exception import MetaflowInternalError, MetaflowTaggingError
910
from metaflow.tagging_util import validate_tag
1011
from metaflow.util import get_username, resolve_identity_as_tuple, is_stringish
@@ -672,6 +673,38 @@ def _register_system_metadata(self, run_id, step_name, task_id, attempt):
672673
if metadata:
673674
self.register_metadata(run_id, step_name, task_id, metadata)
674675

676+
@classmethod
677+
def filter_tasks_by_metadata(
678+
cls,
679+
flow_name: str,
680+
run_id: str,
681+
step_name: str,
682+
field_name: str,
683+
pattern: str,
684+
) -> List[str]:
685+
"""
686+
Filter tasks by metadata field and pattern, returning task pathspecs that match criteria.
687+
688+
Parameters
689+
----------
690+
flow_name : str
691+
Flow name, that the run belongs to.
692+
run_id: str
693+
Run id, together with flow_id, that identifies the specific Run whose tasks to query
694+
step_name: str
695+
Step name to query tasks from
696+
field_name: str
697+
Metadata field name to query
698+
pattern: str
699+
Pattern to match in metadata field value
700+
701+
Returns
702+
-------
703+
List[str]
704+
List of task pathspecs that satisfy the query
705+
"""
706+
raise NotImplementedError()
707+
675708
@staticmethod
676709
def _apply_filter(elts, filters):
677710
if filters is None:

metaflow/plugins/metadata_providers/local.py

+66
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@
22
import glob
33
import json
44
import os
5+
import re
56
import random
67
import tempfile
78
import time
89
from collections import namedtuple
10+
from typing import List
911

1012
from metaflow.exception import MetaflowInternalError, MetaflowTaggingError
1113
from metaflow.metadata_provider.metadata import ObjectOrder
@@ -202,6 +204,70 @@ def _optimistically_mutate():
202204
"Tagging failed due to too many conflicting updates from other processes"
203205
)
204206

207+
@classmethod
208+
def filter_tasks_by_metadata(
209+
cls,
210+
flow_name: str,
211+
run_id: str,
212+
step_name: str,
213+
field_name: str,
214+
pattern: str,
215+
) -> List[str]:
216+
"""
217+
Filter tasks by metadata field and pattern, returning task pathspecs that match criteria.
218+
219+
Parameters
220+
----------
221+
flow_name : str
222+
Identifier for the flow
223+
run_id : str
224+
Identifier for the run
225+
step_name : str
226+
Name of the step to query tasks from
227+
field_name : str
228+
Name of metadata field to query
229+
pattern : str
230+
Pattern to match in metadata field value
231+
232+
Returns
233+
-------
234+
List[str]
235+
List of task pathspecs that match the query criteria
236+
"""
237+
tasks = cls.get_object("step", "task", {}, None, flow_name, run_id, step_name)
238+
if not tasks:
239+
return []
240+
241+
regex = re.compile(pattern)
242+
matching_task_pathspecs = []
243+
244+
for task in tasks:
245+
task_id = task.get("task_id")
246+
if not task_id:
247+
continue
248+
249+
if pattern == ".*":
250+
# If the pattern is ".*", we can match all tasks without reading metadata
251+
matching_task_pathspecs.append(
252+
f"{flow_name}/{run_id}/{step_name}/{task_id}"
253+
)
254+
continue
255+
256+
metadata = cls.get_object(
257+
"task", "metadata", {}, None, flow_name, run_id, step_name, task_id
258+
)
259+
260+
if any(
261+
meta.get("field_name") == field_name
262+
and regex.match(meta.get("value", ""))
263+
for meta in metadata
264+
):
265+
matching_task_pathspecs.append(
266+
f"{flow_name}/{run_id}/{step_name}/{task_id}"
267+
)
268+
269+
return matching_task_pathspecs
270+
205271
@classmethod
206272
def _get_object_internal(
207273
cls, obj_type, obj_order, sub_type, sub_order, filters, attempt, *args

metaflow/plugins/metadata_providers/service.py

+51
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import requests
66

7+
from typing import List
78
from metaflow.exception import (
89
MetaflowException,
910
MetaflowInternalError,
@@ -13,6 +14,7 @@
1314
from metaflow.metadata_provider.heartbeat import HB_URL_KEY
1415
from metaflow.metaflow_config import SERVICE_HEADERS, SERVICE_RETRY_COUNT, SERVICE_URL
1516
from metaflow.sidecar import Message, MessageTypes, Sidecar
17+
from urllib.parse import urlencode
1618
from metaflow.util import version_parse
1719

1820

@@ -318,6 +320,55 @@ def _new_task(
318320
self._register_system_metadata(run_id, step_name, task["task_id"], attempt)
319321
return task["task_id"], did_create
320322

323+
@classmethod
324+
def filter_tasks_by_metadata(
325+
cls,
326+
flow_name: str,
327+
run_id: str,
328+
step_name: str,
329+
field_name: str,
330+
pattern: str,
331+
) -> List[str]:
332+
"""
333+
Filter tasks by metadata field and pattern, returning task pathspecs that match criteria.
334+
335+
Parameters
336+
----------
337+
flow_name : str
338+
Flow name, that the run belongs to.
339+
run_id: str
340+
Run id, together with flow_id, that identifies the specific Run whose tasks to query
341+
step_name: str
342+
Step name to query tasks from
343+
field_name: str
344+
Metadata field name to query
345+
pattern: str
346+
Pattern to match in metadata field value
347+
348+
Returns
349+
-------
350+
List[str]
351+
List of task pathspecs that satisfy the query
352+
"""
353+
query_params = {
354+
"metadata_field_name": field_name,
355+
"pattern": pattern,
356+
"step_name": step_name,
357+
}
358+
url = ServiceMetadataProvider._obj_path(flow_name, run_id, step_name)
359+
url = f"{url}/filtered_tasks?{urlencode(query_params)}"
360+
try:
361+
resp = cls._request(None, url, "GET")
362+
except Exception as e:
363+
if e.http_code == 404:
364+
# filter_tasks_by_metadata endpoint does not exist in the version of metadata service
365+
# deployed currently. Raise a more informative error message.
366+
raise MetaflowInternalError(
367+
"The version of metadata service deployed currently does not support filtering tasks by metadata. "
368+
"Upgrade Metadata service to version 2.15 or greater to use this feature."
369+
) from e
370+
return resp
371+
321372
@staticmethod
322373
def _obj_path(
323374
flow_name,

0 commit comments

Comments
 (0)