Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 16 additions & 9 deletions data_rentgen/server/services/lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ async def get_lineage_by_jobs( # noqa: C901, PLR0912, PLR0915
}

if level == 0:
result.merge(await self._populate_parents(result.runs.keys(), granularity=granularity))
result.merge(await self._populate_parents(result, granularity=granularity))
if include_column_lineage:
result.column_lineage.update(
await self._get_column_lineage(
Expand Down Expand Up @@ -459,7 +459,7 @@ async def get_lineage_by_runs( # noqa: C901, PLR0915, PLR0912
}

if level == 0:
result.merge(await self._populate_parents(result.runs.keys(), granularity=granularity))
result.merge(await self._populate_parents(result, granularity=granularity))
if include_column_lineage:
result.column_lineage.update(
await self._get_column_lineage(current_result=result, since=since, until=until, granularity="RUN"),
Expand Down Expand Up @@ -620,7 +620,7 @@ async def get_lineage_by_operations( # noqa: C901, PLR0912, PLR0915
}

if level == 0:
result.merge(await self._populate_parents(result.runs.keys(), granularity="OPERATION"))
result.merge(await self._populate_parents(result, granularity="OPERATION"))
if include_column_lineage:
result.column_lineage.update(
await self._get_column_lineage(
Expand Down Expand Up @@ -741,7 +741,7 @@ async def get_lineage_by_datasets( # noqa: C901
raise ValueError(msg)

if level == 0:
result.merge(await self._populate_parents(result.runs.keys(), granularity=granularity))
result.merge(await self._populate_parents(result, granularity=granularity))
if include_column_lineage:
result.column_lineage.update(
await self._get_column_lineage(
Expand Down Expand Up @@ -1328,15 +1328,13 @@ async def _get_column_lineage(

async def _populate_parents(
self,
run_ids: Collection[UUID],
result: LineageServiceResult,
granularity: Literal["OPERATION", "RUN", "JOB", "DATASET"],
) -> LineageServiceResult:
"""Returns a LineageServiceResult with only run_parent_relations or job_parent_relations populated."""
if not run_ids:
return LineageServiceResult()
match granularity:
case "RUN" | "OPERATION":
relations = await self._uow.run.list_ancestor_relations(run_ids)
relations = await self._uow.run.list_ancestor_relations(result.runs.keys())
parents_run_ids = {p_id for p_id, _ in relations}
runs = await self._uow.run.list_by_ids(parents_run_ids)
runs_by_id = {run.id: run for run in runs}
Expand All @@ -1348,8 +1346,17 @@ async def _populate_parents(
runs=runs_by_id,
jobs=jobs_by_id,
)

case "JOB":
return LineageServiceResult()
relations = await self._uow.job.list_ancestor_relations(result.jobs.keys())
parents_job_ids = {p_id for p_id, _ in relations}
jobs = await self._uow.job.list_by_ids(parents_job_ids)
jobs_by_id = {job.id: job for job in jobs}
return LineageServiceResult(
job_ancestor_relations={tuple(r) for r in relations},
jobs=jobs_by_id,
)

case "DATASET":
return LineageServiceResult()
case _:
Expand Down
1 change: 1 addition & 0 deletions docs/changelog/next_release/401.improvement.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Include job ancestor relations in lineage response when granularity is ``JOB``. The ``ancestors`` field now contains job→job ancestry chains for job-level lineage in addition to run→run chains for run/operation granularity.
45 changes: 45 additions & 0 deletions tests/test_server/test_lineage/test_job_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -1115,6 +1115,51 @@ async def test_get_job_lineage_for_long_running_operations(
}


async def test_get_job_lineage_with_ancestor_relations(
test_client: AsyncClient,
async_session: AsyncSession,
lineage_with_parent_run_relations: LineageResult,
mocked_user: MockedUser,
):
lineage = lineage_with_parent_run_relations
run = lineage.runs[-1]
job = next(job for job in lineage.jobs if run.job_id == job.id)
since = run.created_at

jobs = await enrich_jobs(lineage.jobs, async_session)
datasets = await enrich_datasets(lineage.datasets, async_session)

response = await test_client.get(
"v1/jobs/lineage",
headers={"Authorization": f"Bearer {mocked_user.access_token}"},
params={
"since": since.isoformat(),
"start_node_id": job.id,
},
)

assert response.status_code == HTTPStatus.OK, response.json()
assert response.json() == {
"relations": {
"parents": [],
"ancestors": jobs_ancestors_to_json(jobs),
"symlinks": [],
"inputs": [
*inputs_to_json(merge_io_by_jobs(lineage.inputs), granularity="JOB"),
],
"outputs": [],
"direct_column_lineage": [],
"indirect_column_lineage": [],
},
"nodes": {
"datasets": datasets_to_json(datasets),
"runs": {},
"jobs": jobs_to_json(jobs),
"operations": {},
},
}


async def test_get_job_lineage_with_granularity_run_and_ancestor_relations(
test_client: AsyncClient,
async_session: AsyncSession,
Expand Down
Loading