diff --git a/data_rentgen/consumer/extractors/batch_extraction_result.py b/data_rentgen/consumer/extractors/batch_extraction_result.py
index 76b1c6c2..1b1d7ecf 100644
--- a/data_rentgen/consumer/extractors/batch_extraction_result.py
+++ b/data_rentgen/consumer/extractors/batch_extraction_result.py
@@ -10,6 +10,7 @@
DatasetDTO,
DatasetSymlinkDTO,
InputDTO,
+ JobDependencyDTO,
JobDTO,
JobTypeDTO,
LocationDTO,
@@ -29,6 +30,7 @@
DatasetDTO,
DatasetSymlinkDTO,
InputDTO,
+ JobDependencyDTO,
JobDTO,
JobTypeDTO,
LocationDTO,
@@ -67,6 +69,7 @@ def __init__(self):
self._dataset_symlinks: dict[tuple, DatasetSymlinkDTO] = {}
self._job_types: dict[tuple, JobTypeDTO] = {}
self._jobs: dict[tuple, JobDTO] = {}
+ self._job_dependencies: dict[tuple, JobDependencyDTO] = {}
self._runs: dict[tuple, RunDTO] = {}
self._operations: dict[tuple, OperationDTO] = {}
self._inputs: dict[tuple, InputDTO] = {}
@@ -86,6 +89,7 @@ def __repr__(self):
f"dataset_symlinks={len(self._dataset_symlinks)}, "
f"job_types={len(self._job_types)}, "
f"jobs={len(self._jobs)}, "
+ f"job_dependencies={len(self._job_dependencies)}, "
f"runs={len(self._runs)}, "
f"operations={len(self._operations)}, "
f"inputs={len(self._inputs)}, "
@@ -139,12 +143,19 @@ def add_job(self, job: JobDTO):
job.tag_values = {self.add_tag_value(tag_value) for tag_value in job.tag_values}
return self._add(self._jobs, job)
+ def add_job_dependency(self, job_dependency: JobDependencyDTO):
+ job_dependency.from_job = self.add_job(job_dependency.from_job)
+ job_dependency.to_job = self.add_job(job_dependency.to_job)
+ return self._add(self._job_dependencies, job_dependency)
+
def add_run(self, run: RunDTO):
run.job = self.add_job(run.job)
if run.parent_run:
run.parent_run = self.add_run(run.parent_run)
if run.user:
run.user = self.add_user(run.user)
+ for job_dependency in run.job_dependencies:
+ self.add_job_dependency(job_dependency)
return self._add(self._runs, run)
def add_operation(self, operation: OperationDTO):
@@ -232,6 +243,12 @@ def get_job(self, job_key: tuple) -> JobDTO:
job.tag_values = {self.get_tag_value(tag_value.unique_key) for tag_value in job.tag_values}
return job
+ def get_job_dependency(self, job_dependency_key: tuple) -> JobDependencyDTO:
+ job_dependency = self._job_dependencies[job_dependency_key]
+ job_dependency.from_job = self.get_job(job_dependency.from_job.unique_key)
+ job_dependency.to_job = self.get_job(job_dependency.to_job.unique_key)
+ return job_dependency
+
def get_run(self, run_key: tuple) -> RunDTO:
run = self._runs[run_key]
run.job = self.get_job(run.job.unique_key)
@@ -290,6 +307,9 @@ def job_types(self) -> list[JobTypeDTO]:
def jobs(self) -> list[JobDTO]:
return self._resolve(self.get_job, self._jobs)
+ def job_dependencies(self) -> list[JobDependencyDTO]:
+ return self._resolve(self.get_job_dependency, self._job_dependencies)
+
def runs(self) -> list[RunDTO]:
return self._resolve(self.get_run, self._runs)
diff --git a/data_rentgen/consumer/extractors/generic/job.py b/data_rentgen/consumer/extractors/generic/job.py
index 999a5f0b..dd719d7a 100644
--- a/data_rentgen/consumer/extractors/generic/job.py
+++ b/data_rentgen/consumer/extractors/generic/job.py
@@ -13,6 +13,7 @@
)
from data_rentgen.openlineage.job import OpenLineageJob
from data_rentgen.openlineage.run_facets import (
+ OpenLineageJobIdentifier,
OpenLineageParentJob,
)
@@ -29,7 +30,10 @@ def extract_job(self, job: OpenLineageJob) -> JobDTO:
)
return self._enrich_job_tags(job_dto, job)
- def extract_parent_job(self, job: OpenLineageJob | OpenLineageParentJob) -> JobDTO:
+ def extract_pure_job(
+ self,
+ job: OpenLineageJob | OpenLineageParentJob | OpenLineageJobIdentifier,
+ ) -> JobDTO:
"""
Extract JobDTO from parent job reference
"""
@@ -38,7 +42,10 @@ def extract_parent_job(self, job: OpenLineageJob | OpenLineageParentJob) -> JobD
location=self._extract_job_location(job),
)
- def _extract_job_location(self, job: OpenLineageJob | OpenLineageParentJob) -> LocationDTO:
+ def _extract_job_location(
+ self,
+ job: OpenLineageJob | OpenLineageParentJob | OpenLineageJobIdentifier,
+ ) -> LocationDTO:
# hostname and scheme are normalized to lowercase for uniqueness
url = urlparse(job.namespace.lower())
scheme = url.scheme or "unknown"
diff --git a/data_rentgen/consumer/extractors/generic/run.py b/data_rentgen/consumer/extractors/generic/run.py
index 9d6e9979..e0deca7f 100644
--- a/data_rentgen/consumer/extractors/generic/run.py
+++ b/data_rentgen/consumer/extractors/generic/run.py
@@ -5,6 +5,7 @@
from abc import ABC, abstractmethod
from data_rentgen.dto import (
+ JobDependencyDTO,
JobDTO,
RunDTO,
RunStatusDTO,
@@ -17,10 +18,11 @@
OpenLineageRunEventType,
)
from data_rentgen.openlineage.run_facets import (
+ OpenLineageJobIdentifier,
OpenLineageParentJob,
OpenLineageParentRunFacet,
+ OpenLineageRunTagsFacetField,
)
-from data_rentgen.openlineage.run_facets.run_tags import OpenLineageRunTagsFacetField
class RunExtractorMixin(ABC):
@@ -29,18 +31,14 @@ def extract_job(self, job: OpenLineageJob) -> JobDTO:
pass
@abstractmethod
- def extract_parent_job(self, job: OpenLineageJob | OpenLineageParentJob) -> JobDTO:
+ def extract_pure_job(self, job: OpenLineageJob | OpenLineageParentJob | OpenLineageJobIdentifier) -> JobDTO:
pass
def extract_run(self, event: OpenLineageRunEvent) -> RunDTO:
"""
Extract RunDTO from specific event
"""
- run = RunDTO(
- id=event.run.runId, # type: ignore [arg-type]
- job=self.extract_job(event.job),
- parent_run=self.extract_parent_run(event.run.facets.parent) if event.run.facets.parent else None,
- )
+ run = self.extract_pure_run(event)
if run.parent_run:
run.job.parent_job = run.parent_run.job
self._enrich_run_status(run, event)
@@ -49,15 +47,23 @@ def extract_run(self, event: OpenLineageRunEvent) -> RunDTO:
self._add_openlineage_client_version_tag(run, event)
self._enrich_run_tags(run, event)
self._enrich_nominal_times(run, event)
+ self._enrich_job_dependencies(run, event)
return run
+ def extract_pure_run(self, event: OpenLineageRunEvent) -> RunDTO:
+ return RunDTO(
+ id=event.run.runId, # type: ignore [arg-type]
+ job=self.extract_job(event.job),
+ parent_run=self.extract_parent_run(event.run.facets.parent) if event.run.facets.parent else None,
+ )
+
def extract_parent_run(self, facet: OpenLineageParentRunFacet | OpenLineageRunEvent) -> RunDTO:
"""
Extract RunDTO from parent run reference
"""
return RunDTO(
id=facet.run.runId,
- job=self.extract_parent_job(facet.job),
+ job=self.extract_pure_job(facet.job),
)
def _enrich_run_status(self, run: RunDTO, event: OpenLineageRunEvent) -> RunDTO:
@@ -146,3 +152,27 @@ def _enrich_nominal_times(self, run: RunDTO, event: OpenLineageRunEvent) -> RunD
run.expected_end_at = None
return run
+
+ def _enrich_job_dependencies(self, run: RunDTO, event: OpenLineageRunEvent) -> RunDTO:
+ if not event.run.facets.jobDependencies:
+ return run
+
+ for upstream in event.run.facets.jobDependencies.upstream:
+ run.job_dependencies.append(
+ JobDependencyDTO(
+ from_job=self.extract_pure_job(upstream.job),
+ to_job=run.job,
+ type=upstream.dependency_type,
+ ),
+ )
+
+ for downstream in event.run.facets.jobDependencies.downstream:
+ run.job_dependencies.append(
+ JobDependencyDTO(
+ from_job=run.job,
+ to_job=self.extract_pure_job(downstream.job),
+ type=downstream.dependency_type,
+ ),
+ )
+
+ return run
diff --git a/data_rentgen/consumer/extractors/impl/airflow_task.py b/data_rentgen/consumer/extractors/impl/airflow_task.py
index 8a5f99d4..d9f04b08 100644
--- a/data_rentgen/consumer/extractors/impl/airflow_task.py
+++ b/data_rentgen/consumer/extractors/impl/airflow_task.py
@@ -8,7 +8,8 @@
from data_rentgen.consumer.extractors.generic import GenericExtractor
from data_rentgen.consumer.extractors.impl.utils import parse_kv_tag
-from data_rentgen.dto import JobDTO, OperationDTO, RunDTO, RunStartReasonDTO, UserDTO
+from data_rentgen.dto import JobDTO, JobTypeDTO, OperationDTO, RunDTO, RunStartReasonDTO, UserDTO
+from data_rentgen.dto.job_dependency import JobDependencyDTO
from data_rentgen.openlineage.job import OpenLineageJob
from data_rentgen.openlineage.job_facets import OpenLineageJobTagsFacetField
from data_rentgen.openlineage.run_event import OpenLineageRunEvent
@@ -211,3 +212,36 @@ def _enrich_run_tags(self, run: RunDTO, event: OpenLineageRunEvent) -> RunDTO:
# facets are immutable
object.__setattr__(event.run.facets, "tags", OpenLineageRunTagsFacet(tags=run_tags))
return super()._enrich_run_tags(run, event)
+
+ def _enrich_job_dependencies(self, run: RunDTO, event: OpenLineageRunEvent) -> RunDTO:
+ run = super()._enrich_job_dependencies(run, event)
+ if event.run.facets.airflow:
+ # https://github.com/apache/airflow/pull/59521 brings up jobDependency facet,
+ # but it still doesn't contain direct task -> task dependencies
+ dag_info = event.run.facets.airflow.dag
+ task_info = event.run.facets.airflow.task
+ for upstream_task_id in task_info.upstream_task_ids:
+ run.job_dependencies.append(
+ JobDependencyDTO(
+ from_job=JobDTO(
+ name=f"{dag_info.dag_id}.{upstream_task_id}",
+ location=run.job.location,
+ type=JobTypeDTO("AIRFLOW_TASK"),
+ ),
+ to_job=run.job,
+ type="DIRECT_DEPENDENCY",
+ ),
+ )
+ for downstream_task_id in task_info.downstream_task_ids:
+ run.job_dependencies.append(
+ JobDependencyDTO(
+ from_job=run.job,
+ to_job=JobDTO(
+ name=f"{dag_info.dag_id}.{downstream_task_id}",
+ location=run.job.location,
+ type=JobTypeDTO("AIRFLOW_TASK"),
+ ),
+ type="DIRECT_DEPENDENCY",
+ ),
+ )
+ return run
diff --git a/data_rentgen/consumer/extractors/impl/hive.py b/data_rentgen/consumer/extractors/impl/hive.py
index d1ebca87..6f196d1b 100644
--- a/data_rentgen/consumer/extractors/impl/hive.py
+++ b/data_rentgen/consumer/extractors/impl/hive.py
@@ -34,7 +34,7 @@ def is_operation(self, event: OpenLineageRunEvent) -> bool:
# All events produced by Hive integration are queries == operations
return True
- def extract_run(self, event: OpenLineageRunEvent) -> RunDTO:
+ def extract_pure_run(self, event: OpenLineageRunEvent) -> RunDTO:
# Hive produce only hive query events, but no events for hive session:
# https://github.com/OpenLineage/OpenLineage/issues/3784
# But we treat queries as operations, and operations should be bound to run (session) for grouping.
@@ -55,7 +55,7 @@ def extract_run(self, event: OpenLineageRunEvent) -> RunDTO:
if hive_session.username not in ("anonymous", "hive"):
user = UserDTO(name=hive_session.username)
- run = RunDTO(
+ return RunDTO(
id=run_id,
job=JobDTO(
name=job_name,
@@ -68,12 +68,8 @@ def extract_run(self, event: OpenLineageRunEvent) -> RunDTO:
external_id=hive_session.sessionId,
user=user,
)
- if run.parent_run:
- run.job.parent_job = run.parent_run.job
- self._add_engine_version_tag(run, event)
- self._add_openlineage_adapter_version_tag(run, event)
- self._add_openlineage_client_version_tag(run, event)
- self._enrich_run_tags(run, event)
+
+ def _enrich_run_status(self, run: RunDTO, event: OpenLineageRunEvent):
return run
def extract_operation(self, event: OpenLineageRunEvent) -> OperationDTO:
diff --git a/data_rentgen/consumer/saver.py b/data_rentgen/consumer/saver.py
index f50be125..de09c65d 100644
--- a/data_rentgen/consumer/saver.py
+++ b/data_rentgen/consumer/saver.py
@@ -32,6 +32,7 @@ async def save(self, data: BatchExtractionResult):
await self.create_dataset_symlinks(data)
await self.create_job_types(data)
await self.create_jobs(data)
+ await self.create_job_dependencies(data)
await self.create_users(data)
await self.create_sql_queries(data)
await self.create_schemas(data)
@@ -113,6 +114,15 @@ async def create_jobs(self, data: BatchExtractionResult):
job = await self.unit_of_work.job.update(job, job_dto) # noqa: PLW2901
job_dto.id = job.id
+ async def create_job_dependencies(self, data: BatchExtractionResult):
+ self.logger.debug("Creating job dependencies")
+ job_dependency_pairs = await self.unit_of_work.job_dependency.fetch_bulk(data.job_dependencies())
+ for job_dependency_dto, job_dependency in job_dependency_pairs:
+ if not job_dependency:
+ async with self.unit_of_work:
+ job_dependency = await self.unit_of_work.job_dependency.create(job_dependency_dto) # noqa: PLW2901
+ job_dependency_dto.id = job_dependency.id
+
async def create_users(self, data: BatchExtractionResult):
self.logger.debug("Creating users")
user_pairs = await self.unit_of_work.user.fetch_bulk(data.users())
diff --git a/data_rentgen/db/migrations/versions/2026-03-06_4e119cb7481e_add_job_dependency_table.py b/data_rentgen/db/migrations/versions/2026-03-06_4e119cb7481e_add_job_dependency_table.py
new file mode 100644
index 00000000..01c83220
--- /dev/null
+++ b/data_rentgen/db/migrations/versions/2026-03-06_4e119cb7481e_add_job_dependency_table.py
@@ -0,0 +1,50 @@
+# SPDX-FileCopyrightText: 2024-present MTS PJSC
+# SPDX-License-Identifier: Apache-2.0
+"""Add job_dependency table
+
+Revision ID: 4e119cb7481e
+Revises: a1950f06a8cb
+Create Date: 2026-03-06 17:39:31.296534
+
+"""
+
+import sqlalchemy as sa
+from alembic import op
+
+# revision identifiers, used by Alembic.
+revision = "4e119cb7481e"
+down_revision = "a1950f06a8cb"
+branch_labels = None
+depends_on = None
+
+
+def upgrade() -> None:
+ op.create_table(
+ "job_dependency",
+ sa.Column("id", sa.BigInteger(), nullable=False),
+ sa.Column("from_job_id", sa.BigInteger(), nullable=False),
+ sa.Column("to_job_id", sa.BigInteger(), nullable=False),
+ sa.Column("type", sa.String(), nullable=True),
+ sa.ForeignKeyConstraint(
+ ["from_job_id"],
+ ["job.id"],
+ name=op.f("fk__job_dependency__from_job_id__job"),
+ ondelete="CASCADE",
+ ),
+ sa.ForeignKeyConstraint(
+ ["to_job_id"],
+ ["job.id"],
+ name=op.f("fk__job_dependency__to_job_id__job"),
+ ondelete="CASCADE",
+ ),
+ sa.PrimaryKeyConstraint("id", name=op.f("pk__job_dependency")),
+ sa.UniqueConstraint("from_job_id", "to_job_id", name=op.f("uq__job_dependency__from_job_id_to_job_id")),
+ )
+ op.create_index(op.f("ix__job_dependency__from_job_id"), "job_dependency", ["from_job_id"], unique=False)
+ op.create_index(op.f("ix__job_dependency__to_job_id"), "job_dependency", ["to_job_id"], unique=False)
+
+
+def downgrade() -> None:
+ op.drop_index(op.f("ix__job_dependency__to_job_id"), table_name="job_dependency")
+ op.drop_index(op.f("ix__job_dependency__from_job_id"), table_name="job_dependency")
+ op.drop_table("job_dependency")
diff --git a/data_rentgen/db/models/__init__.py b/data_rentgen/db/models/__init__.py
index 4a2bbe21..091963e5 100644
--- a/data_rentgen/db/models/__init__.py
+++ b/data_rentgen/db/models/__init__.py
@@ -14,6 +14,7 @@
from data_rentgen.db.models.dataset_symlink import DatasetSymlink, DatasetSymlinkType
from data_rentgen.db.models.input import Input
from data_rentgen.db.models.job import Job, JobTagValue
+from data_rentgen.db.models.job_dependency import JobDependency
from data_rentgen.db.models.job_type import JobType
from data_rentgen.db.models.location import Location
from data_rentgen.db.models.operation import Operation, OperationStatus, OperationType
@@ -40,6 +41,7 @@
"DatasetTagValue",
"Input",
"Job",
+ "JobDependency",
"JobLastRun",
"JobTagValue",
"JobType",
diff --git a/data_rentgen/db/models/job_dependency.py b/data_rentgen/db/models/job_dependency.py
new file mode 100644
index 00000000..f79d67b5
--- /dev/null
+++ b/data_rentgen/db/models/job_dependency.py
@@ -0,0 +1,47 @@
+# SPDX-FileCopyrightText: 2024-present MTS PJSC
+# SPDX-License-Identifier: Apache-2.0
+
+
+from sqlalchemy import BigInteger, ForeignKey, String, UniqueConstraint
+from sqlalchemy.orm import Mapped, mapped_column, relationship
+
+from data_rentgen.db.models.base import Base
+from data_rentgen.db.models.job import Job
+
+
+class JobDependency(Base):
+ __tablename__ = "job_dependency"
+ __table_args__ = (UniqueConstraint("from_job_id", "to_job_id"),)
+
+ id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
+ from_job_id: Mapped[int] = mapped_column(
+ BigInteger,
+ ForeignKey("job.id", ondelete="CASCADE"),
+ index=True,
+ nullable=False,
+ doc="From job id",
+ )
+ from_job: Mapped[Job] = relationship(
+ Job,
+ lazy="noload",
+ foreign_keys=[from_job_id],
+ )
+
+ to_job_id: Mapped[int] = mapped_column(
+ BigInteger,
+ ForeignKey("job.id", ondelete="CASCADE"),
+ index=True,
+ nullable=False,
+ doc="To job id",
+ )
+ to_job: Mapped[Job] = relationship(
+ Job,
+ lazy="noload",
+ foreign_keys=[to_job_id],
+ )
+
+ type: Mapped[str] = mapped_column(
+ String,
+ nullable=True,
+ doc="Dependency type",
+ )
diff --git a/data_rentgen/db/repositories/job_dependency.py b/data_rentgen/db/repositories/job_dependency.py
new file mode 100644
index 00000000..74a6cbc5
--- /dev/null
+++ b/data_rentgen/db/repositories/job_dependency.py
@@ -0,0 +1,76 @@
+# SPDX-FileCopyrightText: 2024-present MTS PJSC
+# SPDX-License-Identifier: Apache-2.0
+
+
+from sqlalchemy import ARRAY, Integer, bindparam, cast, func, select, tuple_
+
+from data_rentgen.db.models.job_dependency import JobDependency
+from data_rentgen.db.repositories.base import Repository
+from data_rentgen.dto import JobDependencyDTO
+
+fetch_bulk_query = select(JobDependency).where(
+ tuple_(JobDependency.from_job_id, JobDependency.to_job_id).in_(
+ select(
+ func.unnest(
+ cast(bindparam("from_job_ids"), ARRAY(Integer())),
+ cast(bindparam("to_job_ids"), ARRAY(Integer())),
+ )
+ .table_valued("from_job_ids", "to_job_ids")
+ .render_derived(),
+ ),
+ ),
+)
+
+get_one_query = select(JobDependency).where(
+ JobDependency.from_job_id == bindparam("from_job_id"),
+ JobDependency.to_job_id == bindparam("to_job_id"),
+)
+
+
+class JobDependencyRepository(Repository[JobDependency]):
+ async def fetch_bulk(
+ self,
+ job_dependencies_dto: list[JobDependencyDTO],
+ ) -> list[tuple[JobDependencyDTO, JobDependency | None]]:
+ if not job_dependencies_dto:
+ return []
+
+ scalars = await self._session.scalars(
+ fetch_bulk_query,
+ {
+ "from_job_ids": [item.from_job.id for item in job_dependencies_dto],
+ "to_job_ids": [item.to_job.id for item in job_dependencies_dto],
+ },
+ )
+ existing = {(item.from_job_id, item.to_job_id): item for item in scalars.all()}
+ return [
+ (
+ dto,
+ existing.get((dto.from_job.id, dto.to_job.id)), # type: ignore[arg-type]
+ )
+ for dto in job_dependencies_dto
+ ]
+
+ async def create(self, job_dependency: JobDependencyDTO) -> JobDependency:
+ # if another worker already created the same row, just use it. if not - create with holding the lock.
+ await self._lock(job_dependency.from_job.id, job_dependency.to_job.id)
+ return await self._get(job_dependency) or await self._create(job_dependency)
+
+ async def _get(self, job_dependency: JobDependencyDTO) -> JobDependency | None:
+ return await self._session.scalar(
+ get_one_query,
+ {
+ "from_job_id": job_dependency.from_job.id,
+ "to_job_id": job_dependency.to_job.id,
+ },
+ )
+
+ async def _create(self, job_dependency: JobDependencyDTO) -> JobDependency:
+ result = JobDependency(
+ from_job_id=job_dependency.from_job.id,
+ to_job_id=job_dependency.to_job.id,
+ type=job_dependency.type,
+ )
+ self._session.add(result)
+ await self._session.flush([result])
+ return result
diff --git a/data_rentgen/db/scripts/seed/dbt.py b/data_rentgen/db/scripts/seed/dbt.py
index e28d78f2..0d5631a2 100644
--- a/data_rentgen/db/scripts/seed/dbt.py
+++ b/data_rentgen/db/scripts/seed/dbt.py
@@ -9,6 +9,7 @@
from faker import Faker
from data_rentgen.consumer.extractors import BatchExtractionResult
+from data_rentgen.db.scripts.seed.airflow import generate_airflow_run
from data_rentgen.dto import (
ColumnLineageDTO,
DatasetColumnRelationDTO,
@@ -92,10 +93,21 @@ def generate_dbt_run(
start: datetime,
end: datetime,
) -> BatchExtractionResult:
+ run_created_at = faker.date_time_between(start, end, tzinfo=UTC)
+ run_started_at = run_created_at + timedelta(minutes=faker.pyfloat(min_value=0, max_value=3))
+ run_ended_at = run_started_at + timedelta(minutes=faker.pyfloat(min_value=30, max_value=35))
+ parent_run = generate_airflow_run(
+ "mart_layer_dag",
+ "mart_layer_task_dbt",
+ run_created_at - timedelta(seconds=faker.pyint(min_value=5, max_value=10)),
+ run_ended_at + timedelta(seconds=faker.pyint(min_value=5, max_value=10)),
+ )
+
job = JobDTO(
name="dbt-run-user_metrics",
location=LOCATIONS["local"],
type=JobTypeDTO(type="DBT_JOB"),
+ parent_job=parent_run.job,
tag_values={
TagValueDTO(
tag=TagDTO(name="dbt.version"),
@@ -116,13 +128,11 @@ def generate_dbt_run(
},
)
- run_created_at = faker.date_time_between(start, end, tzinfo=UTC)
- run_started_at = run_created_at + timedelta(minutes=faker.pyfloat(min_value=0, max_value=3))
- run_ended_at = run_started_at + timedelta(minutes=faker.pyfloat(min_value=30, max_value=35))
run_id = generate_new_uuid(run_created_at)
run = RunDTO(
id=run_id,
job=job,
+ parent_run=parent_run,
status=RunStatusDTO.SUCCEEDED,
external_id=str(uuid4()),
started_at=run_started_at,
diff --git a/data_rentgen/db/scripts/seed/spark_yarn.py b/data_rentgen/db/scripts/seed/spark_yarn.py
index 2244128c..37381e5d 100644
--- a/data_rentgen/db/scripts/seed/spark_yarn.py
+++ b/data_rentgen/db/scripts/seed/spark_yarn.py
@@ -17,6 +17,7 @@
DatasetSymlinkDTO,
DatasetSymlinkTypeDTO,
InputDTO,
+ JobDependencyDTO,
JobDTO,
JobTypeDTO,
LocationDTO,
@@ -179,9 +180,9 @@ def generate_spark_run_yarn(
run_created_at = faker.date_time_between(start, end, tzinfo=UTC)
run_started_at = run_created_at + timedelta(minutes=faker.pyfloat(min_value=0, max_value=3))
run_ended_at = run_started_at + timedelta(minutes=faker.pyfloat(min_value=10, max_value=12))
- paren_run = generate_airflow_run(
+ parent_run = generate_airflow_run(
"mart_layer_dag",
- "mart_layer_task",
+ "mart_layer_task_spark",
run_created_at - timedelta(seconds=faker.pyint(min_value=5, max_value=10)),
run_ended_at + timedelta(seconds=faker.pyint(min_value=5, max_value=10)),
)
@@ -190,7 +191,7 @@ def generate_spark_run_yarn(
name="mart_layer_loader",
location=LOCATIONS["yarn"],
type=JobTypeDTO(type="SPARK_APPLICATION"),
- parent_job=paren_run.job,
+ parent_job=parent_run.job,
tag_values={
TagValueDTO(
tag=TagDTO(name="spark.version"),
@@ -207,13 +208,27 @@ def generate_spark_run_yarn(
},
)
+ # add Airflow Task mart_layer_dag.mart_layer_dbt -> mart_layer_dag.mart_layer_task_spark dependency
+ parent_run.job_dependencies.append(
+ JobDependencyDTO(
+ from_job=JobDTO(
+ name="mart_layer_dag.mart_layer_dbt",
+ parent_job=parent_run.job.parent_job, # DAG
+ location=parent_run.job.location,
+ type=JobTypeDTO(type="AIRFLOW_TASK"),
+ ),
+ to_job=job,
+ type="DIRECT_DEPENDENCY",
+ )
+ )
+
run_id = generate_new_uuid(run_created_at)
external_id = f"application_{run_created_at.timestamp() * 1000}_0001"
run = RunDTO(
id=run_id,
job=job,
status=RunStatusDTO.SUCCEEDED,
- parent_run=paren_run,
+ parent_run=parent_run,
external_id=external_id,
running_log_url=f"http://{faker.ipv4_private()}:{faker.port_number(is_user=True)}",
persistent_log_url=f"http://mn01.hadoop.companyname.com:8088/proxy/{external_id}",
diff --git a/data_rentgen/dto/__init__.py b/data_rentgen/dto/__init__.py
index 3cc5a8be..74ae98c5 100644
--- a/data_rentgen/dto/__init__.py
+++ b/data_rentgen/dto/__init__.py
@@ -10,6 +10,7 @@
from data_rentgen.dto.dataset_symlink import DatasetSymlinkDTO, DatasetSymlinkTypeDTO
from data_rentgen.dto.input import InputDTO
from data_rentgen.dto.job import JobDTO
+from data_rentgen.dto.job_dependency import JobDependencyDTO
from data_rentgen.dto.job_type import JobTypeDTO
from data_rentgen.dto.location import LocationDTO
from data_rentgen.dto.operation import (
@@ -34,6 +35,7 @@
"DatasetSymlinkTypeDTO",
"InputDTO",
"JobDTO",
+ "JobDependencyDTO",
"JobTypeDTO",
"LocationDTO",
"OperationDTO",
diff --git a/data_rentgen/dto/job_dependency.py b/data_rentgen/dto/job_dependency.py
new file mode 100644
index 00000000..aed6fa4c
--- /dev/null
+++ b/data_rentgen/dto/job_dependency.py
@@ -0,0 +1,26 @@
+# SPDX-FileCopyrightText: 2024-present MTS PJSC
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import annotations
+
+from dataclasses import dataclass, field
+
+from data_rentgen.dto.job import JobDTO
+
+
+@dataclass(slots=True)
+class JobDependencyDTO:
+ from_job: JobDTO
+ to_job: JobDTO
+ type: str | None = None
+ id: int | None = field(default=None, compare=False)
+
+ @property
+ def unique_key(self) -> tuple:
+ return (self.from_job.unique_key, self.to_job.unique_key, self.type)
+
+ def merge(self, new: JobDependencyDTO) -> JobDependencyDTO:
+ self.from_job = self.from_job.merge(new.from_job)
+ self.to_job = self.to_job.merge(new.to_job)
+ self.id = new.id or self.id
+ return self
diff --git a/data_rentgen/dto/run.py b/data_rentgen/dto/run.py
index c89573fc..45a73d25 100644
--- a/data_rentgen/dto/run.py
+++ b/data_rentgen/dto/run.py
@@ -9,6 +9,7 @@
from uuid import UUID
from data_rentgen.dto.job import JobDTO
+from data_rentgen.dto.job_dependency import JobDependencyDTO
from data_rentgen.dto.user import UserDTO
from data_rentgen.utils.uuid import extract_timestamp_from_uuid
@@ -46,6 +47,7 @@ class RunDTO:
persistent_log_url: str | None = None
expected_start_at: datetime | None = None
expected_end_at: datetime | None = None
+ job_dependencies: list[JobDependencyDTO] = field(default_factory=list)
def __post_init__(self):
self.created_at = extract_timestamp_from_uuid(self.id)
@@ -66,6 +68,15 @@ def merge(self, new: RunDTO) -> RunDTO:
else:
self.user = new.user or self.user
+ existing_dependencies = {item.unique_key: item for item in self.job_dependencies}
+ merged_dependencies = []
+ for job_dependency in new.job_dependencies:
+ if job_dependency.unique_key in existing_dependencies:
+ merged_dependencies.append(existing_dependencies[job_dependency.unique_key].merge(job_dependency))
+ else:
+ merged_dependencies.append(job_dependency)
+ self.job_dependencies = merged_dependencies
+
self.status = max(new.status, self.status)
self.started_at = new.started_at or self.started_at
self.start_reason = new.start_reason or self.start_reason
diff --git a/data_rentgen/openlineage/run_facets/__init__.py b/data_rentgen/openlineage/run_facets/__init__.py
index 482fd143..d5b0dc35 100644
--- a/data_rentgen/openlineage/run_facets/__init__.py
+++ b/data_rentgen/openlineage/run_facets/__init__.py
@@ -21,6 +21,12 @@
)
from data_rentgen.openlineage.run_facets.hive_query import OpenLineageHiveQueryInfoRunFacet
from data_rentgen.openlineage.run_facets.hive_session import OpenLineageHiveSessionInfoRunFacet
+from data_rentgen.openlineage.run_facets.job_dependency import (
+ OpenLineageJobDependenciesRunFacet,
+ OpenLineageJobDependency,
+ OpenLineageJobIdentifier,
+ OpenLineageRunIdentifier,
+)
from data_rentgen.openlineage.run_facets.nominal_time import OpenLineageNominalTimeRunFacet
from data_rentgen.openlineage.run_facets.parent_run import (
OpenLineageParentJob,
@@ -52,6 +58,9 @@
"OpenLineageAirflowTaskRunFacet",
"OpenLineageDbtRunRunFacet",
"OpenLineageFlinkJobDetailsRunFacet",
+ "OpenLineageJobDependenciesRunFacet",
+ "OpenLineageJobDependency",
+ "OpenLineageJobIdentifier",
"OpenLineageNominalTimeRunFacet",
"OpenLineageParentJob",
"OpenLineageParentRun",
@@ -59,6 +68,7 @@
"OpenLineageProcessingEngineRunFacet",
"OpenLineageRunFacet",
"OpenLineageRunFacets",
+ "OpenLineageRunIdentifier",
"OpenLineageRunTagsFacet",
"OpenLineageRunTagsFacetField",
"OpenLineageSparkApplicationDetailsRunFacet",
@@ -86,3 +96,4 @@ class OpenLineageRunFacets(OpenLineageBase):
hive_query: OpenLineageHiveQueryInfoRunFacet | None = None
hive_session: OpenLineageHiveSessionInfoRunFacet | None = None
nominalTime: OpenLineageNominalTimeRunFacet | None = None
+ jobDependencies: OpenLineageJobDependenciesRunFacet | None = None
diff --git a/data_rentgen/openlineage/run_facets/airflow.py b/data_rentgen/openlineage/run_facets/airflow.py
index 641521af..472b571b 100644
--- a/data_rentgen/openlineage/run_facets/airflow.py
+++ b/data_rentgen/openlineage/run_facets/airflow.py
@@ -81,6 +81,29 @@ class OpenLineageAirflowTaskInfo(OpenLineageBase):
task_id: str = Field(examples=["my_task"])
operator_class: str | None = Field(default=None, examples=["MyOperator"])
task_group: OpenLineageAirflowTaskGroupInfo | None = None
+ downstream_task_ids: list[str] = Field(default_factory=list, examples=["downstream_task1", "downstream_task2"])
+ upstream_task_ids: list[str] = Field(default_factory=list, examples=["upstream_task1", "upstream_task2"])
+
+ @field_validator("downstream_task_ids", "upstream_task_ids", mode="before")
+ @classmethod
+ def _validate_task_ids(cls, value: list[str] | str) -> list[str]:
+ """
+
+ At different moments time tags could be passed in formats:
+ * ``["task1", "task2"]`` - as expected by JSON spec
+ * ``"['task1', 'task2']"`` - `str(list)` representation
+ * ``'["task1", "task2"]'`` - `str(list)` representation if tag contains single quotes
+
+ See:
+ * https://github.com/apache/airflow/pull/40371
+ * https://github.com/apache/airflow/pull/40854
+ * https://github.com/apache/airflow/pull/41786
+ """
+ if isinstance(value, list):
+ return value
+ with suppress(SyntaxError):
+ return ast.literal_eval(value)
+ return []
class OpenLineageAirflowTaskInstanceInfo(OpenLineageBase):
diff --git a/data_rentgen/openlineage/run_facets/job_dependency.py b/data_rentgen/openlineage/run_facets/job_dependency.py
new file mode 100644
index 00000000..94f08d4a
--- /dev/null
+++ b/data_rentgen/openlineage/run_facets/job_dependency.py
@@ -0,0 +1,62 @@
+# SPDX-FileCopyrightText: 2024-present MTS PJSC
+# SPDX-License-Identifier: Apache-2.0
+
+from pydantic import UUID7, Field
+
+from data_rentgen.openlineage.base import OpenLineageBase
+from data_rentgen.openlineage.run_facets.base import OpenLineageRunFacet
+
+
+class OpenLineageJobIdentifier(OpenLineageBase):
+ """Job identifier.
+ See [JobDependenciesRunFacet](https://github.com/OpenLineage/OpenLineage/blob/main/spec/facets/JobDependenciesRunFacet.json).
+ """
+
+ namespace: str = Field(examples=["http://my-airflow.domain.com:8081"], json_schema_extra={"format": "uri"})
+ name: str = Field(examples=["my_dag.my_task"])
+
+
+class OpenLineageRunIdentifier(OpenLineageBase):
+ """Run identifier.
+ See [JobDependenciesRunFacet](https://github.com/OpenLineage/OpenLineage/blob/main/spec/facets/JobDependenciesRunFacet.json).
+ """
+
+ runId: UUID7 = Field(examples=["019867d4-1b59-71fe-bc30-3fbd38703700"])
+
+
+class OpenLineageJobDependency(OpenLineageRunFacet):
+ """Job dependency item.
+ See [JobDependenciesRunFacet](https://github.com/OpenLineage/OpenLineage/blob/main/spec/facets/JobDependenciesRunFacet.json).
+ """
+
+ job: OpenLineageJobIdentifier
+ run: OpenLineageRunIdentifier | None = None
+ dependency_type: str | None = Field(
+ default=None,
+ examples=["DIRECT_INVOCATION", "IMPLICIT_DEPENDENCY"],
+ description="Implementation specific",
+ )
+ sequence_trigger_rule: str | None = Field(
+ default=None,
+ examples=["FINISH_TO_START", "FINISH_TO_FINISH", "START_TO_START"],
+ description="Currently ignored",
+ )
+ status_trigger_rule: str | None = Field(
+ default=None,
+ examples=["EXECUTE_EVERY_TIME", "EXECUTE_ON_SUCCESS", "EXECUTE_ON_FAILURE"],
+ description="Currently ignored",
+ )
+
+
+class OpenLineageJobDependenciesRunFacet(OpenLineageRunFacet):
+ """Run facet describing job dependencies.
+ See [JobDependenciesRunFacet](https://github.com/OpenLineage/OpenLineage/blob/main/spec/facets/JobDependenciesRunFacet.json).
+ """
+
+ upstream: list[OpenLineageJobDependency] = Field(default_factory=list)
+ downstream: list[OpenLineageJobDependency] = Field(default_factory=list)
+ trigger_rule: str | None = Field(
+ default=None,
+ examples=["ALL_SUCCESS", "ALL_DONE", "ONE_SUCCESS", "NONE_FAILED"],
+ description="Currently ignored",
+ )
diff --git a/data_rentgen/services/uow.py b/data_rentgen/services/uow.py
index 58ea4fdd..ccac9c42 100644
--- a/data_rentgen/services/uow.py
+++ b/data_rentgen/services/uow.py
@@ -15,6 +15,7 @@
from data_rentgen.db.repositories.input import InputRepository
from data_rentgen.db.repositories.io_dataset_relation import IODatasetRelationRepository
from data_rentgen.db.repositories.job import JobRepository
+from data_rentgen.db.repositories.job_dependency import JobDependencyRepository
from data_rentgen.db.repositories.job_type import JobTypeRepository
from data_rentgen.db.repositories.location import LocationRepository
from data_rentgen.db.repositories.operation import OperationRepository
@@ -38,6 +39,7 @@ def __init__(
self.location = LocationRepository(session)
self.job_type = JobTypeRepository(session)
self.job = JobRepository(session)
+ self.job_dependency = JobDependencyRepository(session)
self.run = RunRepository(session)
self.operation = OperationRepository(session)
self.dataset = DatasetRepository(session)
diff --git a/docs/changelog/next_release/402.feature.rst b/docs/changelog/next_release/402.feature.rst
new file mode 100644
index 00000000..6c50b46f
--- /dev/null
+++ b/docs/changelog/next_release/402.feature.rst
@@ -0,0 +1,6 @@
+Extract information from `jobDependencies `_ facet, and store it in ``job_dependency`` table.
+For now this is just a simple tuple `from_dataset_id`, `to_dataset_id, `type` (arbitrary string provided by integration, not enum). This can be changed in future versions of Data.Rentgen.
+
+Currently the only integration providing this kind of information is Airflow. But it is implemented only in most recent version of OpenLineage provider for Airflow (`2.10 or higher `_).
+For now provides also doesn't send facet with information about direct task -> task dependencies - only indirect ones are included (declared via `Asset `_).
+So there is a fallback for Airflow which extracts these dependencies from ``downstream_task_ids`` and ``upstream_task_ids`` task fields.
diff --git a/docs/reference/database/structure.rst b/docs/reference/database/structure.rst
index d021cfd9..f04c6215 100644
--- a/docs/reference/database/structure.rst
+++ b/docs/reference/database/structure.rst
@@ -197,6 +197,14 @@ Database structure
revoked_at: timestamptz
}
+ entity job_dependency {
+ * id: bigint
+ ----
+ * from_dataset_id: bigint
+ * to_dataset_id: bigint
+ type: text null
+ }
+
address ||--o{ location
dataset ||--o{ location
@@ -232,9 +240,12 @@ Database structure
column_lineage "fingerprint" ||--o{ dataset_column_relation
tag_value ||--o{ tag
- dataset_tagvalue ||--o{ tag_value
- job_tagvalue ||--o{ tag_value
+ dataset_tag_value ||--o{ tag_value
+ job_tag_value ||--o{ tag_value
personal_token ||--o{ user
+ job_dependency "from_job_id" ||--o{ job
+ job_dependency "to_job_id" ||--o{ job
+
@enduml
diff --git a/tests/test_consumer/test_extractors/test_extractors_run_airflow.py b/tests/test_consumer/test_extractors/test_extractors_run_airflow.py
index 168c1e54..0aaeed78 100644
--- a/tests/test_consumer/test_extractors/test_extractors_run_airflow.py
+++ b/tests/test_consumer/test_extractors/test_extractors_run_airflow.py
@@ -18,6 +18,7 @@
TagValueDTO,
UserDTO,
)
+from data_rentgen.dto.job_dependency import JobDependencyDTO
from data_rentgen.openlineage.job import OpenLineageJob
from data_rentgen.openlineage.job_facets import (
OpenLineageJobFacets,
@@ -43,6 +44,12 @@
OpenLineageProcessingEngineRunFacet,
OpenLineageRunFacets,
)
+from data_rentgen.openlineage.run_facets.job_dependency import (
+ OpenLineageJobDependenciesRunFacet,
+ OpenLineageJobDependency,
+ OpenLineageJobIdentifier,
+ OpenLineageRunIdentifier,
+)
from data_rentgen.openlineage.run_facets.run_tags import OpenLineageRunTagsFacet, OpenLineageRunTagsFacetField
@@ -1485,3 +1492,320 @@ def test_extractors_extract_run_airflow_task_nominal_times(
expected_start_at=expected_start_at,
expected_end_at=expected_end_at,
)
+
+
+def test_extractors_extract_run_airflow_task_job_dependencies_ol_provider_2_10_plus():
+ # https://github.com/apache/airflow/pull/59521
+ now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc)
+ run_id = UUID("01908223-0782-79b8-9495-b1c38aaee839")
+ run = OpenLineageRunEvent(
+ eventType=OpenLineageRunEventType.COMPLETE,
+ eventTime=now,
+ job=OpenLineageJob(
+ namespace="http://airflow-host:8081",
+ name="mydag.mytask",
+ facets=OpenLineageJobFacets(
+ jobType=OpenLineageJobTypeJobFacet(
+ processingType=OpenLineageJobProcessingType.BATCH,
+ integration="AIRFLOW",
+ jobType="TASK",
+ ),
+ ),
+ ),
+ run=OpenLineageRun(
+ runId=run_id,
+ facets=OpenLineageRunFacets(
+ processing_engine=OpenLineageProcessingEngineRunFacet(
+ version=Version("2.9.2"),
+ name="Airflow",
+ openlineageAdapterVersion=Version("2.3.0"),
+ ),
+ airflow=OpenLineageAirflowTaskRunFacet(
+ dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"),
+ dagRun=OpenLineageAirflowDagRunInfo(
+ run_id="scheduled__2024-07-05T09:04:13:979349+00:00",
+ run_type=OpenLineageAirflowDagRunType.SCHEDULED,
+ data_interval_start=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
+ data_interval_end=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
+ ),
+ task=OpenLineageAirflowTaskInfo(
+ task_id="mytask",
+ operator_class="BashOperator",
+ ),
+ taskInstance=OpenLineageAirflowTaskInstanceInfo(
+ try_number=1,
+ ),
+ ),
+ jobDependencies=OpenLineageJobDependenciesRunFacet(
+ upstream=[
+ OpenLineageJobDependency(
+ job=OpenLineageJobIdentifier(
+ namespace="http://airflow-host:8081",
+ name="mydag.previous_task",
+ ),
+ run=OpenLineageRunIdentifier(
+ runId=UUID("01908223-0782-79b8-9495-b1c38aaee831"),
+ ),
+ dependency_type="IMPLICIT_DEPENDENCY",
+ sequence_trigger_rule="FINISH_TO_START",
+ status_trigger_rule="EXECUTE_EVERY_TIME",
+ ),
+ ],
+ downstream=[
+ OpenLineageJobDependency(
+ job=OpenLineageJobIdentifier(
+ namespace="http://airflow-host:8081",
+ name="mydag.next_task",
+ ),
+ dependency_type="IMPLICIT_DEPENDENCY",
+ sequence_trigger_rule="FINISH_TO_START",
+ status_trigger_rule="EXECUTE_EVERY_TIME",
+ ),
+ ],
+ trigger_rule="ALL_SUCCESS",
+ ),
+ ),
+ ),
+ )
+
+ assert AirflowTaskExtractor().extract_run(run) == RunDTO(
+ id=run_id,
+ job=JobDTO(
+ name="mydag.mytask",
+ location=LocationDTO(
+ type="http",
+ name="airflow-host:8081",
+ addresses={"http://airflow-host:8081"},
+ ),
+ type=JobTypeDTO(type="AIRFLOW_TASK"),
+ tag_values={
+ TagValueDTO(
+ tag=TagDTO(name="airflow.version"),
+ value="2.9.2",
+ ),
+ TagValueDTO(
+ tag=TagDTO(name="openlineage_adapter.version"),
+ value="2.3.0",
+ ),
+ },
+ ),
+ status=RunStatusDTO.SUCCEEDED,
+ started_at=None,
+ start_reason=RunStartReasonDTO.AUTOMATIC,
+ user=None,
+ ended_at=now,
+ external_id="scheduled__2024-07-05T09:04:13:979349+00:00",
+ attempt="1",
+ persistent_log_url=(
+ "http://airflow-host:8081/dags/mydag/grid?tab=logs&dag_run_id=scheduled__2024-07-05T09%3A04%3A13%3A979349%2B00%3A00&task_id=mytask&map_index=-1"
+ ),
+ running_log_url=None,
+ job_dependencies=[
+ JobDependencyDTO(
+ from_job=JobDTO(
+ name="mydag.previous_task",
+ location=LocationDTO(
+ type="http",
+ name="airflow-host:8081",
+ addresses={"http://airflow-host:8081"},
+ ),
+ ),
+ to_job=JobDTO(
+ name="mydag.mytask",
+ location=LocationDTO(
+ type="http",
+ name="airflow-host:8081",
+ addresses={"http://airflow-host:8081"},
+ ),
+ type=JobTypeDTO(type="AIRFLOW_TASK"),
+ tag_values={
+ TagValueDTO(
+ tag=TagDTO(name="airflow.version"),
+ value="2.9.2",
+ ),
+ TagValueDTO(
+ tag=TagDTO(name="openlineage_adapter.version"),
+ value="2.3.0",
+ ),
+ },
+ ),
+ type="IMPLICIT_DEPENDENCY",
+ ),
+ JobDependencyDTO(
+ from_job=JobDTO(
+ name="mydag.mytask",
+ location=LocationDTO(
+ type="http",
+ name="airflow-host:8081",
+ addresses={"http://airflow-host:8081"},
+ ),
+ type=JobTypeDTO(type="AIRFLOW_TASK"),
+ tag_values={
+ TagValueDTO(
+ tag=TagDTO(name="airflow.version"),
+ value="2.9.2",
+ ),
+ TagValueDTO(
+ tag=TagDTO(name="openlineage_adapter.version"),
+ value="2.3.0",
+ ),
+ },
+ ),
+ to_job=JobDTO(
+ name="mydag.next_task",
+ location=LocationDTO(
+ type="http",
+ name="airflow-host:8081",
+ addresses={"http://airflow-host:8081"},
+ ),
+ ),
+ type="IMPLICIT_DEPENDENCY",
+ ),
+ ],
+ )
+
+
+def test_extractors_extract_run_airflow_task_job_dependencies_ol_provider_below_2_10():
+ now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc)
+ run_id = UUID("01908223-0782-79b8-9495-b1c38aaee839")
+ run = OpenLineageRunEvent(
+ eventType=OpenLineageRunEventType.COMPLETE,
+ eventTime=now,
+ job=OpenLineageJob(
+ namespace="http://airflow-host:8081",
+ name="mydag.mytask",
+ facets=OpenLineageJobFacets(
+ jobType=OpenLineageJobTypeJobFacet(
+ processingType=OpenLineageJobProcessingType.BATCH,
+ integration="AIRFLOW",
+ jobType="TASK",
+ ),
+ ),
+ ),
+ run=OpenLineageRun(
+ runId=run_id,
+ facets=OpenLineageRunFacets(
+ processing_engine=OpenLineageProcessingEngineRunFacet(
+ version=Version("2.9.2"),
+ name="Airflow",
+ openlineageAdapterVersion=Version("2.3.0"),
+ ),
+ airflow=OpenLineageAirflowTaskRunFacet(
+ dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"),
+ dagRun=OpenLineageAirflowDagRunInfo(
+ run_id="scheduled__2024-07-05T09:04:13:979349+00:00",
+ run_type=OpenLineageAirflowDagRunType.SCHEDULED,
+ data_interval_start=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
+ data_interval_end=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc),
+ ),
+ task=OpenLineageAirflowTaskInfo(
+ task_id="mytask",
+ operator_class="BashOperator",
+ downstream_task_ids=["next_task"],
+ upstream_task_ids=["previous_task"],
+ ),
+ taskInstance=OpenLineageAirflowTaskInstanceInfo(
+ try_number=1,
+ ),
+ ),
+ ),
+ ),
+ )
+
+ assert AirflowTaskExtractor().extract_run(run) == RunDTO(
+ id=run_id,
+ job=JobDTO(
+ name="mydag.mytask",
+ location=LocationDTO(
+ type="http",
+ name="airflow-host:8081",
+ addresses={"http://airflow-host:8081"},
+ ),
+ type=JobTypeDTO(type="AIRFLOW_TASK"),
+ tag_values={
+ TagValueDTO(
+ tag=TagDTO(name="airflow.version"),
+ value="2.9.2",
+ ),
+ TagValueDTO(
+ tag=TagDTO(name="openlineage_adapter.version"),
+ value="2.3.0",
+ ),
+ },
+ ),
+ status=RunStatusDTO.SUCCEEDED,
+ started_at=None,
+ start_reason=RunStartReasonDTO.AUTOMATIC,
+ user=None,
+ ended_at=now,
+ external_id="scheduled__2024-07-05T09:04:13:979349+00:00",
+ attempt="1",
+ persistent_log_url=(
+ "http://airflow-host:8081/dags/mydag/grid?tab=logs&dag_run_id=scheduled__2024-07-05T09%3A04%3A13%3A979349%2B00%3A00&task_id=mytask&map_index=-1"
+ ),
+ running_log_url=None,
+ job_dependencies=[
+ JobDependencyDTO(
+ from_job=JobDTO(
+ name="mydag.previous_task",
+ location=LocationDTO(
+ type="http",
+ name="airflow-host:8081",
+ addresses={"http://airflow-host:8081"},
+ ),
+ type=JobTypeDTO(type="AIRFLOW_TASK"),
+ ),
+ to_job=JobDTO(
+ name="mydag.mytask",
+ location=LocationDTO(
+ type="http",
+ name="airflow-host:8081",
+ addresses={"http://airflow-host:8081"},
+ ),
+ type=JobTypeDTO(type="AIRFLOW_TASK"),
+ tag_values={
+ TagValueDTO(
+ tag=TagDTO(name="airflow.version"),
+ value="2.9.2",
+ ),
+ TagValueDTO(
+ tag=TagDTO(name="openlineage_adapter.version"),
+ value="2.3.0",
+ ),
+ },
+ ),
+ type="DIRECT_DEPENDENCY",
+ ),
+ JobDependencyDTO(
+ from_job=JobDTO(
+ name="mydag.mytask",
+ location=LocationDTO(
+ type="http",
+ name="airflow-host:8081",
+ addresses={"http://airflow-host:8081"},
+ ),
+ type=JobTypeDTO(type="AIRFLOW_TASK"),
+ tag_values={
+ TagValueDTO(
+ tag=TagDTO(name="airflow.version"),
+ value="2.9.2",
+ ),
+ TagValueDTO(
+ tag=TagDTO(name="openlineage_adapter.version"),
+ value="2.3.0",
+ ),
+ },
+ ),
+ to_job=JobDTO(
+ name="mydag.next_task",
+ location=LocationDTO(
+ type="http",
+ name="airflow-host:8081",
+ addresses={"http://airflow-host:8081"},
+ ),
+ type=JobTypeDTO(type="AIRFLOW_TASK"),
+ ),
+ type="DIRECT_DEPENDENCY",
+ ),
+ ],
+ )
diff --git a/tests/test_consumer/test_handlers/test_runs_handler_airflow.py b/tests/test_consumer/test_handlers/test_runs_handler_airflow.py
index 327cbf1b..54a58010 100644
--- a/tests/test_consumer/test_handlers/test_runs_handler_airflow.py
+++ b/tests/test_consumer/test_handlers/test_runs_handler_airflow.py
@@ -17,6 +17,7 @@
DatasetSymlink,
Input,
Job,
+ JobDependency,
Location,
Operation,
OperationStatus,
@@ -118,6 +119,21 @@ async def test_runs_handler_airflow(
assert {tv.tag.name: tv.value for tv in insert_task_job.tag_values} == expected_tag_values
assert insert_task_job.parent_job_id == dag_job.id
+ job_dependency_query = select(JobDependency).order_by(JobDependency.from_job_id)
+ job_dependency_scalars = await async_session.scalars(job_dependency_query)
+ job_dependencies = job_dependency_scalars.all()
+ assert len(job_dependencies) == 2
+
+ create_to_insert_job_dependency = job_dependencies[0]
+ assert create_to_insert_job_dependency.from_job_id == create_task_job.id
+ assert create_to_insert_job_dependency.to_job_id == insert_task_job.id
+ assert create_to_insert_job_dependency.type == "DIRECT_DEPENDENCY"
+
+ drop_to_create_job_dependency = job_dependencies[1]
+ assert drop_to_create_job_dependency.from_job_id == drop_task_job.id
+ assert drop_to_create_job_dependency.to_job_id == create_task_job.id
+ assert drop_to_create_job_dependency.type == "DIRECT_DEPENDENCY"
+
run_query = select(Run).order_by(Run.id)
run_scalars = await async_session.scalars(run_query)
runs = run_scalars.all()
diff --git a/tests/test_consumer/test_handlers/test_runs_handler_dbt.py b/tests/test_consumer/test_handlers/test_runs_handler_dbt.py
index 965affed..13c06237 100644
--- a/tests/test_consumer/test_handlers/test_runs_handler_dbt.py
+++ b/tests/test_consumer/test_handlers/test_runs_handler_dbt.py
@@ -17,6 +17,7 @@
DatasetSymlink,
Input,
Job,
+ JobDependency,
Location,
Operation,
OperationStatus,
@@ -88,6 +89,11 @@ async def test_runs_handler_dbt(
"openlineage_adapter.version": "1.34.0",
}
+ job_dependency_query = select(JobDependency).order_by(JobDependency.id)
+ job_dependency_scalars = await async_session.scalars(job_dependency_query)
+ job_dependencies = job_dependency_scalars.all()
+ assert not job_dependencies
+
run_query = select(Run).order_by(Run.id).options(selectinload(Run.started_by_user))
run_scalars = await async_session.scalars(run_query)
runs = run_scalars.all()
@@ -106,7 +112,7 @@ async def test_runs_handler_dbt(
assert run.running_log_url is None
assert run.persistent_log_url is None
- sql_query_query = select(SQLQuery).order_by(SQLQuery.id)
+ sql_query_query = select(SQLQuery).order_by(SQLQuery.fingerprint)
sql_euery_scalars = await async_session.scalars(sql_query_query)
sql_queries = sql_euery_scalars.all()
assert len(sql_queries) == 1
diff --git a/tests/test_consumer/test_handlers/test_runs_handler_flink.py b/tests/test_consumer/test_handlers/test_runs_handler_flink.py
index ba86492e..40c8c84f 100644
--- a/tests/test_consumer/test_handlers/test_runs_handler_flink.py
+++ b/tests/test_consumer/test_handlers/test_runs_handler_flink.py
@@ -16,6 +16,7 @@
DatasetSymlink,
Input,
Job,
+ JobDependency,
Location,
Operation,
OperationStatus,
@@ -85,6 +86,11 @@ async def test_runs_handler_flink(
"openlineage_adapter.version": "1.34.0",
}
+ job_dependency_query = select(JobDependency).order_by(JobDependency.id)
+ job_dependency_scalars = await async_session.scalars(job_dependency_query)
+ job_dependencies = job_dependency_scalars.all()
+ assert not job_dependencies
+
run_query = select(Run).order_by(Run.id).options(selectinload(Run.started_by_user))
run_scalars = await async_session.scalars(run_query)
runs = run_scalars.all()
diff --git a/tests/test_consumer/test_handlers/test_runs_handler_hive.py b/tests/test_consumer/test_handlers/test_runs_handler_hive.py
index 1ec20a78..4e731127 100644
--- a/tests/test_consumer/test_handlers/test_runs_handler_hive.py
+++ b/tests/test_consumer/test_handlers/test_runs_handler_hive.py
@@ -18,6 +18,7 @@
DatasetSymlinkType,
Input,
Job,
+ JobDependency,
Location,
Operation,
OperationStatus,
@@ -88,6 +89,11 @@ async def test_runs_handler_hive(
"openlineage_adapter.version": "1.35.0",
}
+ job_dependency_query = select(JobDependency).order_by(JobDependency.id)
+ job_dependency_scalars = await async_session.scalars(job_dependency_query)
+ job_dependencies = job_dependency_scalars.all()
+ assert not job_dependencies
+
run_query = select(Run).order_by(Run.id).options(selectinload(Run.started_by_user))
run_scalars = await async_session.scalars(run_query)
runs = run_scalars.all()
@@ -107,7 +113,7 @@ async def test_runs_handler_hive(
assert session_run.running_log_url is None
assert session_run.persistent_log_url is None
- sql_query = select(SQLQuery).order_by(SQLQuery.id)
+ sql_query = select(SQLQuery).order_by(SQLQuery.fingerprint)
sql_query_scalars = await async_session.scalars(sql_query)
sql_queries = sql_query_scalars.all()
assert len(sql_queries) == 1
diff --git a/tests/test_consumer/test_handlers/test_runs_handler_spark.py b/tests/test_consumer/test_handlers/test_runs_handler_spark.py
index 86eae396..6d19eea9 100644
--- a/tests/test_consumer/test_handlers/test_runs_handler_spark.py
+++ b/tests/test_consumer/test_handlers/test_runs_handler_spark.py
@@ -18,6 +18,7 @@
DatasetSymlinkType,
Input,
Job,
+ JobDependency,
Location,
Operation,
OperationStatus,
@@ -88,6 +89,11 @@ async def test_runs_handler_spark(
"openlineage_adapter.version": "1.19.0",
}
+ job_dependency_query = select(JobDependency).order_by(JobDependency.id)
+ job_dependency_scalars = await async_session.scalars(job_dependency_query)
+ job_dependencies = job_dependency_scalars.all()
+ assert not job_dependencies
+
run_query = select(Run).order_by(Run.id).options(selectinload(Run.started_by_user))
run_scalars = await async_session.scalars(run_query)
runs = run_scalars.all()
@@ -107,7 +113,7 @@ async def test_runs_handler_spark(
assert application_run.running_log_url == "http://127.0.0.1:4040"
assert application_run.persistent_log_url is None
- sql_query = select(SQLQuery).order_by(SQLQuery.id)
+ sql_query = select(SQLQuery).order_by(SQLQuery.fingerprint)
sql_query_scalars = await async_session.scalars(sql_query)
sql_queries = sql_query_scalars.all()
assert len(sql_queries) == 1
diff --git a/tests/test_consumer/test_handlers/test_runs_handler_unknown.py b/tests/test_consumer/test_handlers/test_runs_handler_unknown.py
index 84adf474..3d2ba084 100644
--- a/tests/test_consumer/test_handlers/test_runs_handler_unknown.py
+++ b/tests/test_consumer/test_handlers/test_runs_handler_unknown.py
@@ -17,6 +17,7 @@
DatasetSymlink,
Input,
Job,
+ JobDependency,
Location,
Operation,
OperationStatus,
@@ -85,6 +86,11 @@ async def test_runs_handler_unknown(
assert job.location.addresses[0].url == "unknown://unknown"
assert {tv.tag.name: tv.value for tv in jobs[0].tag_values} == {}
+ job_dependency_query = select(JobDependency).order_by(JobDependency.id)
+ job_dependency_scalars = await async_session.scalars(job_dependency_query)
+ job_dependencies = job_dependency_scalars.all()
+ assert not job_dependencies
+
run_query = select(Run).order_by(Run.id).options(selectinload(Run.started_by_user))
run_scalars = await async_session.scalars(run_query)
runs = run_scalars.all()