-
Notifications
You must be signed in to change notification settings - Fork 92
implement get_lineage tool for complete lineage across dbt resources #461
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| kind: Enhancement or New Feature | ||
| body: Adds get_lineage tool for getting lineage across resources. | ||
| time: 2025-11-29T12:10:02.421001Z |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,6 +13,7 @@ | |
|
|
||
| DEFAULT_PAGE_SIZE = 100 | ||
| DEFAULT_MAX_NODE_QUERY_LIMIT = 10000 | ||
| LINEAGE_LIMIT = 50 # Max nodes per direction for Lineage Tool(ancestors/descendants) | ||
|
|
||
|
|
||
| class GraphQLQueries: | ||
|
|
@@ -302,6 +303,15 @@ class GraphQLQueries: | |
| } | ||
| """) | ||
|
|
||
| # Detail queries - reused for lineage search | ||
| GET_MODEL_DETAILS = load_query("get_model_details.gql") | ||
| GET_SOURCE_DETAILS = load_query("get_source_details.gql") | ||
| GET_SEED_DETAILS = load_query("get_seed_details.gql") | ||
| GET_SNAPSHOT_DETAILS = load_query("get_snapshot_details.gql") | ||
|
|
||
| # Lineage queries | ||
| GET_LINEAGE = load_query("lineage/get_lineage.gql") | ||
|
|
||
|
|
||
| class MetadataAPIClient: | ||
| def __init__(self, config_provider: ConfigProvider[DiscoveryConfig]): | ||
|
|
@@ -667,3 +677,301 @@ async def fetch_details( | |
| if not edges: | ||
| return [] | ||
| return [e["node"] for e in edges] | ||
|
|
||
|
|
||
| class LineageDirection(StrEnum): | ||
| """Direction for lineage traversal.""" | ||
| ANCESTORS = "ancestors" | ||
| DESCENDANTS = "descendants" | ||
| BOTH = "both" | ||
|
|
||
|
|
||
| class LineageResourceType(StrEnum): | ||
| """Resource types supported by the lineage API.""" | ||
| MODEL = "Model" | ||
| SOURCE = "Source" | ||
| SEED = "Seed" | ||
| SNAPSHOT = "Snapshot" | ||
| EXPOSURE = "Exposure" | ||
| METRIC = "Metric" | ||
| SEMANTIC_MODEL = "SemanticModel" | ||
| SAVED_QUERY = "SavedQuery" | ||
| MACRO = "Macro" | ||
| TEST = "Test" | ||
|
|
||
|
|
||
| _RESOURCE_SEARCH_CONFIG = { | ||
| "Model": { | ||
| "query": GraphQLQueries.GET_MODEL_DETAILS, | ||
| "response_path": "resources", | ||
| "gql_type": "Model", | ||
| }, | ||
| "Source": { | ||
| "query": GraphQLQueries.GET_SOURCE_DETAILS, | ||
| "response_path": "resources", | ||
| "gql_type": "Source", | ||
| }, | ||
| "Seed": { | ||
| "query": GraphQLQueries.GET_SEED_DETAILS, | ||
| "response_path": "resources", | ||
| "gql_type": "Seed", | ||
| }, | ||
| "Snapshot": { | ||
| "query": GraphQLQueries.GET_SNAPSHOT_DETAILS, | ||
| "response_path": "resources", | ||
| "gql_type": "Snapshot", | ||
| }, | ||
| } | ||
|
|
||
|
|
||
| class LineageFetcher: | ||
| """Fetcher for lineage data using the Discovery API's lineage query.""" | ||
|
|
||
| def __init__(self, api_client: MetadataAPIClient): | ||
| self.api_client = api_client | ||
|
|
||
| async def get_environment_id(self) -> int: | ||
| config = await self.api_client.config_provider.get_config() | ||
| return config.environment_id | ||
|
|
||
| async def search_resource_by_name(self, name: str, resource_type: str) -> list[dict]: | ||
| """Search for a resource by name/identifier. | ||
|
|
||
| Generic method that handles searching for any supported resource type. | ||
|
|
||
| Args: | ||
| name: The resource name/identifier to search for | ||
| resource_type: Type of resource ("Model", "Source", "Seed", "Snapshot") | ||
|
|
||
| Returns: | ||
| List of matches with uniqueId, name, and resourceType keys | ||
|
|
||
| Raises: | ||
| ValueError: If resource_type is not supported | ||
| """ | ||
| if resource_type not in _RESOURCE_SEARCH_CONFIG: | ||
| raise ValueError( | ||
| f"Unsupported resource_type: {resource_type}. " | ||
| f"Must be one of: {', '.join(_RESOURCE_SEARCH_CONFIG.keys())}" | ||
| ) | ||
|
|
||
| config = _RESOURCE_SEARCH_CONFIG[resource_type] | ||
| environment_id = await self.get_environment_id() | ||
|
|
||
| packages_result = await self.api_client.execute_query( | ||
| ResourceDetailsFetcher.GET_PACKAGES_QUERY, | ||
| variables={"resource": "model", "environmentId": environment_id}, | ||
| ) | ||
| raise_gql_error(packages_result) | ||
|
|
||
| packages = packages_result["data"]["environment"]["applied"]["packages"] | ||
| if not packages: | ||
| return [] | ||
|
|
||
|
|
||
| resource_type_lower = resource_type.lower() | ||
| unique_ids = [ | ||
| f"{resource_type_lower}.{package_name}.{name}" | ||
| for package_name in packages | ||
| ] | ||
|
|
||
| variables = { | ||
| "environmentId": environment_id, | ||
| "filter": { | ||
| "uniqueIds": unique_ids, | ||
| "types": [config["gql_type"]], | ||
| }, | ||
| "first": len(unique_ids), | ||
| } | ||
|
|
||
| query = config["query"] | ||
| result = await self.api_client.execute_query(query, variables) | ||
| raise_gql_error(result) | ||
|
|
||
| edges = result["data"]["environment"]["applied"]["resources"]["edges"] | ||
| if not edges: | ||
| return [] | ||
|
|
||
| return [ | ||
| { | ||
| "uniqueId": edge["node"]["uniqueId"], | ||
| "name": edge["node"]["name"], | ||
| "resourceType": resource_type, | ||
| } | ||
| for edge in edges | ||
| ] | ||
|
|
||
| async def search_all_resources(self, name: str) -> list[dict]: | ||
| """Search for resources by name across all supported types. | ||
|
|
||
| Returns all matches found across all resource types. | ||
| """ | ||
| tasks = [ | ||
| self.search_resource_by_name(name, resource_type) | ||
| for resource_type in _RESOURCE_SEARCH_CONFIG.keys() | ||
| ] | ||
| results = await asyncio.gather(*tasks) | ||
|
|
||
| matches: list[dict] = [] | ||
| for result in results: | ||
| matches.extend(result) | ||
|
|
||
| return matches | ||
|
|
||
| def _build_selector(self, unique_id: str, direction: LineageDirection) -> str: | ||
| """Build dbt selector syntax based on direction. | ||
|
|
||
| - ancestors: +uniqueId (upstream) | ||
| - descendants: uniqueId+ (downstream) | ||
| - both: +uniqueId+ (both directions) | ||
| """ | ||
| if direction not in LineageDirection: | ||
| raise ValueError( | ||
| f"Invalid direction: {direction}. " | ||
| f"Must be one of: {', '.join(d.value for d in LineageDirection)}" | ||
| ) | ||
|
|
||
| if direction == LineageDirection.ANCESTORS: | ||
| return f"+{unique_id}" | ||
| elif direction == LineageDirection.DESCENDANTS: | ||
| return f"{unique_id}+" | ||
| else: # both | ||
| return f"+{unique_id}+" | ||
|
|
||
| async def fetch_lineage( | ||
| self, | ||
| unique_id: str, | ||
| direction: LineageDirection = LineageDirection.BOTH, | ||
| types: list[LineageResourceType] | None = None, | ||
| ) -> dict: | ||
| """Fetch lineage for a resource. | ||
|
|
||
| Args: | ||
| unique_id: The dbt unique ID of the resource | ||
| direction: One of 'ancestors', 'descendants', or 'both' | ||
| types: Optional list of resource types to filter results | ||
|
|
||
| Returns: | ||
| Dict with 'target', 'ancestors', and/or 'descendants' keys | ||
| """ | ||
| if direction not in LineageDirection: | ||
| raise ValueError( | ||
| f"Invalid direction: {direction}. " | ||
| f"Must be one of: {', '.join(d.value for d in LineageDirection)}" | ||
| ) | ||
|
|
||
| if types is not None: | ||
| invalid_types = [t for t in types if t not in LineageResourceType] | ||
| if invalid_types: | ||
| raise ValueError( | ||
| f"Invalid resource type(s): {invalid_types}. " | ||
| f"Valid types are: {', '.join(rt.value for rt in LineageResourceType)}" | ||
| ) | ||
|
|
||
|
|
||
| if direction == LineageDirection.BOTH: | ||
| ancestors_result, descendants_result = await asyncio.gather( | ||
| self._fetch_lineage_single_direction( | ||
| unique_id, LineageDirection.ANCESTORS, types | ||
| ), | ||
| self._fetch_lineage_single_direction( | ||
| unique_id, LineageDirection.DESCENDANTS, types | ||
| ), | ||
| ) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the reason for the two API calls to simplify labeling ancestors and descendants? The selector syntax allows for both directions here, but I assume the separate calls make it easier to label the nodes. Is that right? One suggestion, if we keep two separate calls, they can be done concurrently with asyncio.gather().
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yup! the reason for the two calls is so we can group them into ancestors vs descendants. When using +uniqueId+, the API returns everything mixed together, so we'd need extra logic to figure out which nodes are uancestors vs descendants. I have implemented the asyncio.gather() for parallel execution. |
||
| target = ancestors_result.get("target") or descendants_result.get("target") | ||
|
|
||
| ancestors_pagination = ancestors_result.get("pagination", {}) | ||
| descendants_pagination = descendants_result.get("pagination", {}) | ||
|
|
||
| return { | ||
| "target": target, | ||
| "ancestors": ancestors_result.get("ancestors", []), | ||
| "descendants": descendants_result.get("descendants", []), | ||
| "pagination": { | ||
| "limit": LINEAGE_LIMIT, | ||
| "ancestors_total": ancestors_pagination.get("ancestors_total", 0), | ||
| "ancestors_truncated": ancestors_pagination.get( | ||
| "ancestors_truncated", False | ||
| ), | ||
| "descendants_total": descendants_pagination.get( | ||
| "descendants_total", 0 | ||
| ), | ||
| "descendants_truncated": descendants_pagination.get( | ||
| "descendants_truncated", False | ||
| ), | ||
| }, | ||
| } | ||
| else: | ||
| return await self._fetch_lineage_single_direction(unique_id, direction, types) | ||
|
|
||
| async def _fetch_lineage_single_direction( | ||
| self, | ||
| unique_id: str, | ||
| direction: LineageDirection, | ||
| types: list[LineageResourceType] | None = None, | ||
| ) -> dict: | ||
| """Fetch lineage for a single direction (ancestors or descendants). | ||
|
|
||
| Internal method used by fetch_lineage. | ||
| """ | ||
| selector = self._build_selector(unique_id, direction) | ||
|
|
||
| lineage_filter: dict = {"uniqueIds": [selector]} | ||
| if types: | ||
| lineage_filter["types"] = types | ||
|
|
||
| variables = { | ||
| "environmentId": await self.get_environment_id(), | ||
| "filter": lineage_filter, | ||
| } | ||
|
|
||
| result = await self.api_client.execute_query( | ||
| GraphQLQueries.GET_LINEAGE, variables | ||
| ) | ||
| raise_gql_error(result) | ||
|
|
||
| lineage_nodes = result["data"]["environment"]["applied"]["lineage"] | ||
| if not lineage_nodes: | ||
| return {"target": None, "ancestors": [], "descendants": []} | ||
|
|
||
| return self._transform_lineage_response(lineage_nodes, direction) | ||
|
|
||
| def _transform_lineage_response(self, nodes: list[dict], direction: str) -> dict: | ||
| """Transform raw lineage response into structured output. | ||
|
|
||
| Separates target node (matchesMethod=true) from lineage nodes. | ||
| Applies LINEAGE_LIMIT and includes pagination metadata. | ||
| This method handles single-direction queries only (ancestors OR descendants). | ||
| The "both" direction is handled by fetch_lineage via two separate calls. | ||
| """ | ||
| target: dict | None = None | ||
| lineage_nodes: list[dict] = [] | ||
|
|
||
| for node in nodes: | ||
| if node.get("matchesMethod"): | ||
| target = node | ||
| else: | ||
| lineage_nodes.append(node) | ||
|
|
||
| total = len(lineage_nodes) | ||
| truncated = total > LINEAGE_LIMIT | ||
| lineage_nodes = lineage_nodes[:LINEAGE_LIMIT] | ||
|
|
||
| response: dict = {"target": target} | ||
|
|
||
| if direction == LineageDirection.ANCESTORS: | ||
| response["ancestors"] = lineage_nodes | ||
| response["pagination"] = { | ||
| "limit": LINEAGE_LIMIT, | ||
| "ancestors_total": total, | ||
| "ancestors_truncated": truncated, | ||
| } | ||
| elif direction == LineageDirection.DESCENDANTS: | ||
| response["descendants"] = lineage_nodes | ||
| response["pagination"] = { | ||
| "limit": LINEAGE_LIMIT, | ||
| "descendants_total": total, | ||
| "descendants_truncated": truncated, | ||
| } | ||
|
|
||
| return response | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment on line 765 states "Using 'model' resource for package fetching works for all types" but doesn't explain why this is the case. Add a brief explanation of why the "model" resource can be used for all resource types, as this is non-obvious and important for maintainability.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed comments