diff --git a/.changes/unreleased/Enhancement or New Feature-20251203-203944.yaml b/.changes/unreleased/Enhancement or New Feature-20251203-203944.yaml new file mode 100644 index 00000000..b495eb31 --- /dev/null +++ b/.changes/unreleased/Enhancement or New Feature-20251203-203944.yaml @@ -0,0 +1,3 @@ +kind: Enhancement or New Feature +body: 'Add ability to get model lineage via the cli. ' +time: 2025-12-03T20:39:44.751010031Z diff --git a/README.md b/README.md index 77e904dd..75c4c062 100644 --- a/README.md +++ b/README.md @@ -52,6 +52,7 @@ The dbt MCP server architecture allows for your agent to connect to a variety of - `build` - `compile` - `docs` +- `get_model_lineage_dev` - `list` - `parse` - `run` diff --git a/src/dbt_mcp/dbt_cli/models/lineage_types.py b/src/dbt_mcp/dbt_cli/models/lineage_types.py new file mode 100644 index 00000000..a2946980 --- /dev/null +++ b/src/dbt_mcp/dbt_cli/models/lineage_types.py @@ -0,0 +1,126 @@ +from __future__ import annotations +from typing import Literal, cast + +from pydantic import BaseModel, Field + +from dbt_mcp.dbt_cli.models.manifest import Manifest + + +class Descendant(BaseModel): + model_id: str + children: list[Descendant] = Field(default_factory=list) + + +class Ancestor(BaseModel): + model_id: str + parents: list[Ancestor] = Field(default_factory=list) + + +class ModelLineage(BaseModel): + model_id: str + parents: list[Ancestor] = Field(default_factory=list) + children: list[Descendant] = Field(default_factory=list) + + @classmethod + def from_manifest( + cls, + manifest: Manifest, + model_id: str, + direction: Literal["parents", "children", "both"] = "both", + exclude_prefixes: tuple[str, ...] = ("test.", "unit_test."), + *, + recursive: bool = False, + ) -> ModelLineage: + """ + Build a ModelLineage instance from a dbt manifest mapping. + + - manifest: Manifest object containing at least 'parent_map' and/or 'child_map' + - model_id: the model id to start from + - recursive: whether to traverse recursively + - direction: one of 'parents', 'children', or 'both' + - exclude_prefixes: tuple of prefixes to exclude from descendants, defaults to ("test.", "unit_test.") + Descendants only. Give () to include all. + + The returned ModelLineage contains lists of Ancestor and/or Descendant + objects. + """ + parent_map = manifest.parent_map + child_map = manifest.child_map + + parents: list[Ancestor] = [] + children: list[Descendant] = [] + model_id = get_uid_from_name(manifest, model_id) + + def _build_node( + node_id: str, + map_data: dict[str, list[str]], + key: str, + path: set[str], + ) -> Ancestor | Descendant | None: + if node_id in path: + return None + + next_nodes: list[Ancestor | Descendant] = [] + for next_id in map_data.get(node_id, []): + if next_id.startswith(exclude_prefixes): + continue + child_node = _build_node(next_id, map_data, key, path | {node_id}) + if child_node: + next_nodes.append(child_node) + if key == "parents": + return Ancestor( + model_id=node_id, parents=cast(list[Ancestor], next_nodes) + ) + return Descendant( + model_id=node_id, children=cast(list[Descendant], next_nodes) + ) + + if direction in ("both", "parents"): + for item_id in parent_map.get(model_id, []): + if recursive and item_id.startswith(exclude_prefixes): + continue + + if recursive: + p_node = _build_node(item_id, parent_map, "parents", {model_id}) + if p_node: + parents.append(cast(Ancestor, p_node)) + else: + parents.append(Ancestor(model_id=item_id)) + + if direction in ("both", "children"): + for item_id in child_map.get(model_id, []): + if recursive and item_id.startswith(exclude_prefixes): + continue + + if recursive: + c_node = _build_node(item_id, child_map, "children", {model_id}) + if c_node: + children.append(cast(Descendant, c_node)) + else: + children.append(Descendant(model_id=item_id)) + return cls( + model_id=model_id, + parents=parents, + children=children, + ) + + +def get_uid_from_name(manifest: Manifest, model_id: str) -> str: + """ + Given a dbt manifest mapping and a model name, return the unique_id + corresponding to that model name, or None if not found. + """ + # using the parent and child map so it include sources/exposures + if model_id in manifest.child_map or model_id in manifest.parent_map: + return model_id + # fallback: look through eveything for the identifier + for uid, node in manifest.nodes.items(): + if node.name == model_id: + return uid + for uid, source in manifest.sources.items(): + if source.identifier == model_id: + return uid + for uid, exposure in manifest.exposures.items(): + if exposure.name == model_id: + return uid + raise ValueError(f"Model name '{model_id}' not found in manifest.") diff --git a/src/dbt_mcp/dbt_cli/models/manifest.py b/src/dbt_mcp/dbt_cli/models/manifest.py new file mode 100644 index 00000000..255aacbb --- /dev/null +++ b/src/dbt_mcp/dbt_cli/models/manifest.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python3 +# This is a SUPER simplified version of the dbt manifest.json structure, +# only including the fields we need + +from pydantic import BaseModel, Field + + +class Node(BaseModel): + name: str + + +class Source(BaseModel): + identifier: str + + +class Exposure(BaseModel): + name: str + + +class Manifest(BaseModel): + parent_map: dict[str, list[str]] = Field(default_factory=dict) + child_map: dict[str, list[str]] = Field(default_factory=dict) + nodes: dict[str, Node] = Field(default_factory=dict) + sources: dict[str, Source] = Field(default_factory=dict) + exposures: dict[str, Exposure] = Field(default_factory=dict) diff --git a/src/dbt_mcp/dbt_cli/tools.py b/src/dbt_mcp/dbt_cli/tools.py index 7fff24e0..d5407e2e 100644 --- a/src/dbt_mcp/dbt_cli/tools.py +++ b/src/dbt_mcp/dbt_cli/tools.py @@ -1,12 +1,16 @@ import os import subprocess +import json from collections.abc import Iterable +from typing import Any, Literal from mcp.server.fastmcp import FastMCP from pydantic import Field from dbt_mcp.config.config import DbtCliConfig from dbt_mcp.dbt_cli.binary_type import get_color_disable_flag +from dbt_mcp.dbt_cli.models.lineage_types import ModelLineage +from dbt_mcp.dbt_cli.models.manifest import Manifest from dbt_mcp.prompts.prompts import get_prompt from dbt_mcp.tools.annotations import create_tool_annotations from dbt_mcp.tools.definitions import ToolDefinition @@ -177,6 +181,32 @@ def show( args.extend(["--output", "json"]) return _run_dbt_command(args) + def _get_manifest() -> Manifest: + """Helper function to load the dbt manifest.json file.""" + _run_dbt_command(["parse"]) # Ensure manifest is generated + cwd_path = config.project_dir if os.path.isabs(config.project_dir) else None + manifest_path = os.path.join(cwd_path or ".", "target", "manifest.json") + with open(manifest_path) as f: + manifest_data = json.load(f) + return Manifest(**manifest_data) + + def get_model_lineage_dev( + model_id: str, + direction: Literal["parents", "children", "both"] = "both", + exclude_prefixes: tuple[str, ...] = ("test.", "unit_test."), + *, + recursive: bool, + ) -> dict[str, Any]: + manifest = _get_manifest() + model_lineage = ModelLineage.from_manifest( + manifest, + model_id, + direction=direction, + exclude_prefixes=exclude_prefixes, + recursive=recursive, + ) + return model_lineage.model_dump() + return [ ToolDefinition( fn=build, @@ -259,6 +289,17 @@ def show( idempotent_hint=True, ), ), + ToolDefinition( + name="get_model_lineage_dev", + fn=get_model_lineage_dev, + description=get_prompt("dbt_cli/get_model_lineage_dev"), + annotations=create_tool_annotations( + title="Get Model Lineage (Dev)", + read_only_hint=True, + destructive_hint=False, + idempotent_hint=True, + ), + ), ] diff --git a/src/dbt_mcp/prompts/dbt_cli/get_model_lineage_dev.md b/src/dbt_mcp/prompts/dbt_cli/get_model_lineage_dev.md new file mode 100644 index 00000000..26f6d282 --- /dev/null +++ b/src/dbt_mcp/prompts/dbt_cli/get_model_lineage_dev.md @@ -0,0 +1,26 @@ +get_model_lineage_dev + + +Retrieves the model lineage of a specific dbt model, it allows for upstream, downstream, or both. These are the models that depend on the specified model. + +You can provide either a model_name or a uniqueId, if known, to identify the model. Using uniqueId is more precise and guarantees a unique match, which is especially useful when models might have the same name in different projects. +This specifically ONLY pulls from the local development manifest. If you want production lineage, use `get_model_children` or `get_model_parents` instead. + + + +model_id: str => Either the uniqueId or the `identifier` of the dbt model to retrieve lineage for. +direction: Literal["parents", "children", "both"] = "both" => The direction of lineage to retrieve. "parents" for upstream models, "children" for downstream models, and "both" for both directions. +exclude_prefixes: tuple[str, ...] = ("test.", "unit_test."), => A tuple of prefixes to exclude from the lineage results. Assets with identifiers starting with any of these prefixes will be ignored. +recursive: bool = False => Whether to retrieve lineage recursively. If set to True, it will fetch all levels of lineage in the specified direction(s). + + + +1. Getting children for a model by name: + get_model_lineage_dev(model_id="customer_orders", direction="children") + +2. Getting parents for a model by uniqueId (more precise): + get_model_lineage_dev(model_id="model.my_project.customer_orders", direction="parents") + +3. Getting both upstream and downstream lineage recursively and including tests: + get_model_lineage_dev(model_id="model.my_project.customer_orders", direction="both", exclude_prefixes=(), recursive=True) + diff --git a/src/dbt_mcp/prompts/discovery/get_model_children.md b/src/dbt_mcp/prompts/discovery/get_model_children.md index 3d1d7797..1d18ed7d 100644 --- a/src/dbt_mcp/prompts/discovery/get_model_children.md +++ b/src/dbt_mcp/prompts/discovery/get_model_children.md @@ -2,6 +2,8 @@ Retrieves the child models (downstream dependencies) of a specific dbt model. These are the models that depend on the specified model. You can provide either a model_name or a uniqueId, if known, to identify the model. Using uniqueId is more precise and guarantees a unique match, which is especially useful when models might have the same name in different projects. + +This is specifically for retrieving model children from the production manifest. If you want development lineage, use `get_model_lineage_dev` instead. diff --git a/src/dbt_mcp/prompts/discovery/get_model_parents.md b/src/dbt_mcp/prompts/discovery/get_model_parents.md index caeae027..c75570e3 100644 --- a/src/dbt_mcp/prompts/discovery/get_model_parents.md +++ b/src/dbt_mcp/prompts/discovery/get_model_parents.md @@ -4,6 +4,8 @@ Retrieves the parent models of a specific dbt model. These are the models that t You can provide either a model_name or a uniqueId, if known, to identify the model. Using uniqueId is more precise and guarantees a unique match, which is especially useful when models might have the same name in different projects. Returned parents include `resourceType`, `name`, and `description`. For upstream sources, also provide `sourceName` and `uniqueId` so lineage can be linked back via `get_all_sources`. + +This is specifically for retrieving model parents from the production manifest. If you want development lineage, use `get_model_lineage_dev` instead. diff --git a/src/dbt_mcp/tools/policy.py b/src/dbt_mcp/tools/policy.py index b9504d7d..fa39f6c7 100644 --- a/src/dbt_mcp/tools/policy.py +++ b/src/dbt_mcp/tools/policy.py @@ -53,6 +53,9 @@ class ToolPolicy: ToolName.PARSE.value: ToolPolicy( name=ToolName.PARSE.value, behavior=ToolBehavior.METADATA ), + ToolName.GET_MODEL_LINEAGE_DEV.value: ToolPolicy( + name=ToolName.GET_MODEL_LINEAGE_DEV.value, behavior=ToolBehavior.METADATA + ), # Semantic Layer tools ToolName.LIST_METRICS.value: ToolPolicy( name=ToolName.LIST_METRICS.value, behavior=ToolBehavior.METADATA diff --git a/src/dbt_mcp/tools/tool_names.py b/src/dbt_mcp/tools/tool_names.py index d52102dc..7a5a85a9 100644 --- a/src/dbt_mcp/tools/tool_names.py +++ b/src/dbt_mcp/tools/tool_names.py @@ -13,6 +13,7 @@ class ToolName(Enum): RUN = "run" TEST = "test" SHOW = "show" + GET_MODEL_LINEAGE_DEV = "get_model_lineage_dev" # Semantic Layer tools LIST_METRICS = "list_metrics" diff --git a/src/dbt_mcp/tools/toolsets.py b/src/dbt_mcp/tools/toolsets.py index bf9d2431..c0b9b117 100644 --- a/src/dbt_mcp/tools/toolsets.py +++ b/src/dbt_mcp/tools/toolsets.py @@ -75,6 +75,7 @@ class Toolset(Enum): ToolName.RUN, ToolName.TEST, ToolName.SHOW, + ToolName.GET_MODEL_LINEAGE_DEV, }, Toolset.ADMIN_API: { ToolName.LIST_JOBS, diff --git a/tests/unit/dbt_cli/test_model_lineage.py b/tests/unit/dbt_cli/test_model_lineage.py new file mode 100644 index 00000000..da1e5da5 --- /dev/null +++ b/tests/unit/dbt_cli/test_model_lineage.py @@ -0,0 +1,91 @@ +import pytest + +from dbt_mcp.dbt_cli.models.lineage_types import ModelLineage +from dbt_mcp.dbt_cli.models.manifest import Manifest + + +@pytest.fixture +def sample_manifest(): + data = { + "child_map": { + "model.a": ["model.b", "model.c"], + "model.b": ["model.d", "test.not_included"], + "model.c": [], + "model.d": [], + "source.1": ["model.a"], + }, + "parent_map": { + "model.b": ["model.a"], + "model.c": ["model.a"], + "model.d": ["model.b"], + "model.a": ["source.1"], + "source.1": [], + }, + "nodes": { + "model.a": {"name": "a"}, + "model.b": {"name": "b"}, + "model.c": {"name": "c"}, + "model.d": {"name": "d"}, + }, + "sources": { + "source.1": {"identifier": "1"}, + }, + "exposures": { + "exposure.1": {"name": "1"}, + }, + } + yield Manifest(**data) + + +@pytest.mark.parametrize( + "model_id", + [ + pytest.param("model.a", id="using_full_model_id"), + pytest.param("a", id="using_model_name_only"), + ], +) +def test_model_lineage_a__from_manifest(sample_manifest, model_id): + manifest = sample_manifest + lineage = ModelLineage.from_manifest( + manifest, model_id, direction="both", recursive=True + ) + assert lineage.model_id == "model.a" + assert lineage.parents[0].model_id == "source.1", ( + "Expected source.1 as parent to model.a" + ) + assert len(lineage.children) == 2, "Expected 2 children for model.a" + model_b = lineage.children[0] + assert model_b.model_id == "model.b", "Expected model.b as first child of model.a" + assert len(model_b.children) == 1, ( + "Expect test.not_included to be excluded from children of model.b" + ) + assert model_b.children[0].model_id == "model.d", ( + "Expected model.d as child of model.b" + ) + + +def test_model_lineage_b__from_manifest(sample_manifest): + manifest = sample_manifest + lineage_b = ModelLineage.from_manifest( + manifest, "model.b", direction="parents", recursive=True + ) + assert lineage_b.model_id == "model.b" + assert len(lineage_b.parents) == 1, "Expected 1 parent for model.b" + + assert len(lineage_b.children) == 0, ( + "Expected no children when only fetching parents" + ) + + +def test_model_lineage__from_manifest_with_tests(sample_manifest): + manifest = sample_manifest + + lineage = ModelLineage.from_manifest( + manifest, "model.a", direction="children", recursive=True, exclude_prefixes=() + ) + assert len(lineage.children) == 2, "Expected 2 children for model.a" + model_b = lineage.children[0] + assert model_b.model_id == "model.b", "Expected model.b as first child of model.a" + assert len(model_b.children) == 2, "Expected 2 children for model.b including tests" + assert lineage.children[0].children[1].model_id == "test.not_included" + assert len(lineage.parents) == 0, "Expected no parents when only fetching children"