Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion data_rentgen/server/schemas/v1/lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,6 @@ class IndirectLineageColumnRelationV1(BaseModel):

class LineageRelationsResponseV1(BaseModel):
parents: list[LineageParentRelationV1] = Field(description="Parent relations", default_factory=list)
ancestors: list[LineageParentRelationV1] = Field(description="Ancestors relations", default_factory=list)
symlinks: list[LineageSymlinkRelationV1] = Field(description="Symlink relations", default_factory=list)
inputs: list[LineageInputRelationV1] = Field(description="Input relations", default_factory=list)
outputs: list[LineageOutputRelationV1] = Field(description="Input relations", default_factory=list)
Expand Down
11 changes: 6 additions & 5 deletions data_rentgen/server/utils/lineage_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,10 @@ def build_lineage_response(lineage: LineageServiceResult) -> LineageResponseV1:
operations=operations, # type: ignore[assignment, arg-type]
),
relations=LineageRelationsResponseV1(
parents=_get_run_parent_relations(lineage.runs) + _get_operation_parent_relations(lineage.operations),
ancestors=_get_jobs_hierarchy_chain(lineage.job_ancestor_relations)
+ _get_runs_hierarchy_chain(lineage.run_ancestor_relations),
parents=_get_jobs_ancestor_relations(lineage.job_ancestor_relations)
+ _get_run_ancestor_relations(lineage.run_ancestor_relations)
+ _get_run_parent_relations(lineage.runs)
+ _get_operation_parent_relations(lineage.operations),
symlinks=_get_symlink_relations(lineage.dataset_symlinks),
inputs=_get_input_relations(lineage.inputs),
outputs=_get_output_relations(lineage.outputs),
Expand Down Expand Up @@ -334,7 +335,7 @@ def _get_datasets_with_dataset_granularity(
return datasets


def _get_runs_hierarchy_chain(runs_relations: set[tuple[UUID, UUID]]) -> list[LineageParentRelationV1]:
def _get_run_ancestor_relations(runs_relations: set[tuple[UUID, UUID]]) -> list[LineageParentRelationV1]:
parents = []
for parent_run_id, run_id in runs_relations:
relation = LineageParentRelationV1(
Expand All @@ -345,7 +346,7 @@ def _get_runs_hierarchy_chain(runs_relations: set[tuple[UUID, UUID]]) -> list[Li
return sorted(parents, key=lambda x: (x.from_.id, x.to.id))


def _get_jobs_hierarchy_chain(jobs_relations: set[tuple[int, int]]) -> list[LineageParentRelationV1]:
def _get_jobs_ancestor_relations(jobs_relations: set[tuple[int, int]]) -> list[LineageParentRelationV1]:
parents = []
for parent_job_id, job_id in jobs_relations:
relation = LineageParentRelationV1(
Expand Down
2 changes: 0 additions & 2 deletions docs/changelog/next_release/392.improvement.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ Added ``ancestors`` field to lineage ``relations`` response. It contains run→r
"parents": [
{"from": {"kind": "JOB", "id": "1"}, "to": {"kind": "RUN", "id": "run-uuid"}},
{"from": {"kind": "RUN", "id": "run-uuid"}, "to": {"kind": "OPERATION", "id": "op-uuid"}}
],
"ancestors": [
{"from": {"kind": "RUN", "id": "parent-run-uuid"}, "to": {"kind": "RUN", "id": "run-uuid"}}
],
"symlinks": [],
Expand Down
8 changes: 4 additions & 4 deletions docs/entities/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ Entities
[Operation] --> [Run]: PARENT
[Run] --> [User]: started by
[Run] --> [Run]: PARENT
[Job] --> [Job]: PARENT
[Dataset1] ..> [Operation]: INPUT
[Operation] ..> [Dataset1]: OUTPUT

Expand Down Expand Up @@ -309,15 +310,14 @@ Parent Relation

Relation between child run/operation and its parent. For example:

- Spark applicationName is parent for all its runs (applicationId).
- Spark applicationId is parent for all its Spark job or Spark execution.
- Spark job (applicationName) is parent for all its runs (applicationId).
- Airflow DAG is parent of Airflow task.
- Airflow Task Instance triggered a Spark applicationId, dbt run, and so on.
- Airflow Task Instance can start a Spark run (applicationId), dbt run, and so on.

It contains following fields:

- ``from: Job | Run`` - parent entity.
- ``to: Run | Operation`` - child entity.
- ``to: Job | Run | Operation`` - child entity.

.. image:: parent.png

Expand Down
12 changes: 0 additions & 12 deletions tests/test_server/test_lineage/test_column_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ async def test_get_dataset_lineage_with_empty_column_lineage(
assert response.json() == {
"relations": {
"parents": run_parents_to_json(runs),
"ancestors": [],
"symlinks": [],
"inputs": [
*inputs_to_json(merge_io_by_jobs(inputs), granularity="JOB"),
Expand Down Expand Up @@ -129,7 +128,6 @@ async def test_get_operation_lineage_include_columns_with_combined_transformatio
assert response.json() == {
"relations": {
"parents": run_parents_to_json([run]) + operation_parents_to_json([operation]),
"ancestors": [],
"symlinks": [],
"inputs": [
*inputs_to_json(merge_io_by_jobs(inputs), granularity="JOB"),
Expand Down Expand Up @@ -223,7 +221,6 @@ async def test_get_run_lineage_include_columns_with_combined_transformations(
assert response.json() == {
"relations": {
"parents": run_parents_to_json([run]),
"ancestors": [],
"symlinks": [],
"inputs": [
*inputs_to_json(merge_io_by_jobs(inputs), granularity="JOB"),
Expand Down Expand Up @@ -316,7 +313,6 @@ async def test_get_job_lineage_include_columns_with_combined_transformations(
assert response.json() == {
"relations": {
"parents": [],
"ancestors": [],
"symlinks": [],
"inputs": inputs_to_json(merge_io_by_jobs(inputs), granularity="JOB"),
"outputs": outputs_to_json(merge_io_by_jobs(outputs), granularity="JOB"),
Expand Down Expand Up @@ -441,7 +437,6 @@ async def test_get_dataset_lineage_include_columns_with_depth_and_granularity_ru
assert response.json() == {
"relations": {
"parents": run_parents_to_json(runs),
"ancestors": [],
"symlinks": [],
"inputs": [
*inputs_to_json(merge_io_by_jobs(inputs), granularity="JOB"),
Expand Down Expand Up @@ -564,7 +559,6 @@ async def test_get_dataset_lineage_include_columns_with_depth_and_granularity_jo
assert response.json() == {
"relations": {
"parents": [],
"ancestors": [],
"symlinks": [],
"inputs": inputs_to_json(merge_io_by_jobs(inputs), granularity="JOB"),
"outputs": outputs_to_json(merge_io_by_jobs(outputs), granularity="JOB"),
Expand Down Expand Up @@ -694,7 +688,6 @@ async def test_get_dataset_lineage_include_columns_with_depth_and_granularity_op
assert response.json() == {
"relations": {
"parents": run_parents_to_json(runs) + operation_parents_to_json(operations),
"ancestors": [],
"symlinks": [],
"inputs": [
*inputs_to_json(merge_io_by_jobs(inputs), granularity="JOB"),
Expand Down Expand Up @@ -833,7 +826,6 @@ async def test_get_operation_lineage_include_columns_with_depth(
assert response.json() == {
"relations": {
"parents": run_parents_to_json(runs) + operation_parents_to_json(operations),
"ancestors": [],
"symlinks": [],
"inputs": [
*inputs_to_json(merge_io_by_jobs(inputs), granularity="JOB"),
Expand Down Expand Up @@ -994,7 +986,6 @@ async def test_get_run_lineage_include_columns_with_depth(
assert response.json() == {
"relations": {
"parents": run_parents_to_json(runs),
"ancestors": [],
"symlinks": [],
"inputs": [
*inputs_to_json(merge_io_by_jobs(inputs), granularity="JOB"),
Expand Down Expand Up @@ -1150,7 +1141,6 @@ async def test_get_job_lineage_include_columns_with_depth(
assert response.json() == {
"relations": {
"parents": [],
"ancestors": [],
"symlinks": [],
"inputs": inputs_to_json(merge_io_by_jobs(inputs), granularity="JOB"),
"outputs": outputs_to_json(merge_io_by_jobs(outputs), granularity="JOB"),
Expand Down Expand Up @@ -1262,7 +1252,6 @@ async def test_get_dataset_lineage_with_granularity_dataset_and_column_lineage(
assert response.json() == {
"relations": {
"parents": [],
"ancestors": [],
"symlinks": [],
"outputs": [],
"inputs": sorted(
Expand Down Expand Up @@ -1395,7 +1384,6 @@ async def test_get_dataset_lineage_with_granularity_dataset_and_column_lineage_f
assert response.json() == {
"relations": {
"parents": [],
"ancestors": [],
"symlinks": [],
"outputs": [],
"inputs": sorted(
Expand Down
26 changes: 0 additions & 26 deletions tests/test_server/test_lineage/test_dataset_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ async def test_get_dataset_lineage_no_relations(
assert response.json() == {
"relations": {
"parents": [],
"ancestors": [],
"symlinks": [],
"inputs": [],
"outputs": [],
Expand Down Expand Up @@ -141,7 +140,6 @@ async def test_get_dataset_lineage_with_granularity_run(
assert response.json() == {
"relations": {
"parents": run_parents_to_json(runs),
"ancestors": [],
"symlinks": [],
"inputs": [
*inputs_to_json(merge_io_by_jobs(inputs), granularity="JOB"),
Expand Down Expand Up @@ -201,7 +199,6 @@ async def test_get_dataset_lineage_with_granularity_job(
assert response.json() == {
"relations": {
"parents": [],
"ancestors": [],
"symlinks": [],
"inputs": inputs_to_json(merge_io_by_jobs(inputs), granularity="JOB"),
"outputs": outputs_to_json(merge_io_by_jobs(outputs), granularity="JOB"),
Expand Down Expand Up @@ -264,7 +261,6 @@ async def test_get_dataset_lineage_with_granularity_operation(
assert response.json() == {
"relations": {
"parents": run_parents_to_json(runs) + operation_parents_to_json(operations),
"ancestors": [],
"symlinks": [],
"inputs": [
*inputs_to_json(merge_io_by_jobs(inputs), granularity="JOB"),
Expand Down Expand Up @@ -319,7 +315,6 @@ async def test_get_dataset_lineage_with_granularity_dataset(
assert response.json() == {
"relations": {
"parents": [],
"ancestors": [],
"symlinks": [],
"outputs": [],
"inputs": sorted(
Expand Down Expand Up @@ -396,7 +391,6 @@ async def test_get_dataset_lineage_with_granularity_dataset_and_direction(
assert response.json() == {
"relations": {
"parents": [],
"ancestors": [],
"symlinks": [],
"outputs": [],
"inputs": sorted(
Expand Down Expand Up @@ -465,7 +459,6 @@ async def test_get_dataset_lineage_with_granularity_dataset_and_depth(
assert response.json() == {
"relations": {
"parents": [],
"ancestors": [],
"symlinks": [],
"outputs": [],
"inputs": sorted(
Expand Down Expand Up @@ -539,7 +532,6 @@ async def test_get_dataset_lineage_with_granularity_dataset_and_symlinks(
assert response.json() == {
"relations": {
"parents": [],
"ancestors": [],
"symlinks": symlinks_to_json(dataset_symlinks),
"outputs": [],
"inputs": sorted(
Expand Down Expand Up @@ -615,7 +607,6 @@ async def test_get_dataset_lineage_with_granularity_dataset_and_until(
assert response.json() == {
"relations": {
"parents": [],
"ancestors": [],
"symlinks": [],
"outputs": [],
"inputs": sorted(
Expand Down Expand Up @@ -691,7 +682,6 @@ async def test_get_dataset_lineage_with_direction_downstream(
assert response.json() == {
"relations": {
"parents": run_parents_to_json(runs),
"ancestors": [],
"symlinks": [],
"inputs": [
*inputs_to_json(merge_io_by_jobs(inputs), granularity="JOB"),
Expand Down Expand Up @@ -750,7 +740,6 @@ async def test_get_dataset_lineage_with_direction_upstream(
assert response.json() == {
"relations": {
"parents": run_parents_to_json(runs),
"ancestors": [],
"symlinks": [],
"inputs": [],
"outputs": [
Expand Down Expand Up @@ -817,7 +806,6 @@ async def test_get_dataset_lineage_with_until(
assert response.json() == {
"relations": {
"parents": run_parents_to_json(runs),
"ancestors": [],
"symlinks": [],
"inputs": [
*inputs_to_json(merge_io_by_jobs(inputs), granularity="JOB"),
Expand Down Expand Up @@ -916,7 +904,6 @@ async def test_get_dataset_lineage_with_depth(
assert response.json() == {
"relations": {
"parents": run_parents_to_json(runs),
"ancestors": [],
"symlinks": [],
"inputs": [
*inputs_to_json(merge_io_by_jobs(inputs), granularity="JOB"),
Expand Down Expand Up @@ -1008,7 +995,6 @@ async def test_get_dataset_lineage_with_depth_and_granularity_job(
assert response.json() == {
"relations": {
"parents": [],
"ancestors": [],
"symlinks": [],
"inputs": inputs_to_json(merge_io_by_jobs(inputs), granularity="JOB"),
"outputs": outputs_to_json(merge_io_by_jobs(outputs), granularity="JOB"),
Expand Down Expand Up @@ -1107,7 +1093,6 @@ async def test_get_dataset_lineage_with_depth_and_granularity_operation(
assert response.json() == {
"relations": {
"parents": run_parents_to_json(runs) + operation_parents_to_json(operations),
"ancestors": [],
"symlinks": [],
"inputs": [
*inputs_to_json(merge_io_by_jobs(inputs), granularity="JOB"),
Expand Down Expand Up @@ -1164,7 +1149,6 @@ async def test_get_dataset_lineage_with_depth_ignore_cycles(
assert response.json() == {
"relations": {
"parents": run_parents_to_json(runs),
"ancestors": [],
"symlinks": [],
"inputs": [
*inputs_to_json(merge_io_by_jobs(lineage.inputs), granularity="JOB"),
Expand Down Expand Up @@ -1249,7 +1233,6 @@ async def test_get_dataset_lineage_with_depth_ignore_unrelated_datasets(
assert response.json() == {
"relations": {
"parents": run_parents_to_json(runs),
"ancestors": [],
"symlinks": [],
"inputs": [
*inputs_to_json(merge_io_by_jobs(inputs), granularity="JOB"),
Expand Down Expand Up @@ -1329,7 +1312,6 @@ async def test_get_dataset_lineage_with_symlink(
assert response.json() == {
"relations": {
"parents": run_parents_to_json(runs),
"ancestors": [],
"symlinks": symlinks_to_json(dataset_symlinks),
"inputs": [
*inputs_to_json(merge_io_by_jobs(inputs), granularity="JOB"),
Expand Down Expand Up @@ -1410,7 +1392,6 @@ async def test_get_dataset_lineage_with_symlink_without_input_output(
assert response.json() == {
"relations": {
"parents": run_parents_to_json(runs),
"ancestors": [],
"symlinks": symlinks_to_json(dataset_symlinks),
"inputs": [
*inputs_to_json(merge_io_by_jobs(inputs), granularity="JOB"),
Expand Down Expand Up @@ -1495,7 +1476,6 @@ async def test_get_dataset_lineage_unmergeable_schema_and_output_type(
assert response.json() == {
"relations": {
"parents": run_parents_to_json(runs),
"ancestors": [],
"symlinks": [],
"inputs": [
*inputs_to_json(merge_io_by_jobs(inputs), granularity="JOB"),
Expand Down Expand Up @@ -1611,7 +1591,6 @@ async def test_get_dataset_lineage_empty_io_stats_and_schema(
assert response.json() == {
"relations": {
"parents": run_parents_to_json(runs),
"ancestors": [],
"symlinks": [],
"inputs": [
*inputs_to_json(merged_job_inputs, granularity="JOB"),
Expand Down Expand Up @@ -1680,7 +1659,6 @@ async def test_get_dataset_lineage_with_granularity_dataset_without_output_schem
assert response.json() == {
"relations": {
"parents": [],
"ancestors": [],
"symlinks": [],
"outputs": [],
"inputs": sorted(
Expand Down Expand Up @@ -1758,7 +1736,6 @@ async def test_get_dataset_lineage_with_granularity_dataset_ignore_self_referenc
assert response.json() == {
"relations": {
"parents": [],
"ancestors": [],
"symlinks": [],
"inputs": [],
"outputs": [],
Expand Down Expand Up @@ -1812,7 +1789,6 @@ async def test_get_dataset_lineage_with_granularity_dataset_ignore_not_connected
assert response.json() == {
"relations": {
"parents": [],
"ancestors": [],
"symlinks": [],
"inputs": [],
"outputs": [],
Expand Down Expand Up @@ -1892,7 +1868,6 @@ async def test_get_dataset_lineage_for_long_running_operations_with_granularity_
assert response.json() == {
"relations": {
"parents": run_parents_to_json(runs),
"ancestors": [],
"symlinks": [],
"inputs": [
*inputs_to_json(merge_io_by_jobs(inputs), granularity="JOB"),
Expand Down Expand Up @@ -1978,7 +1953,6 @@ async def test_get_dataset_lineage_for_long_running_operations_with_granularity_
assert response.json() == {
"relations": {
"parents": [],
"ancestors": [],
"symlinks": [],
"inputs": sorted(
[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ async def test_get_lineage_missing_id(
assert response.json() == {
"relations": {
"parents": [],
"ancestors": [],
"symlinks": [],
"inputs": [],
"outputs": [],
Expand Down
Loading