From 265620677e26ada4c34cd37dcf03a3ac00f31de4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Tue, 10 Mar 2026 16:25:48 +0300 Subject: [PATCH] [DOP-33903] Extract and save jobDependencies facet --- .../extractors/batch_extraction_result.py | 20 ++ .../consumer/extractors/generic/job.py | 11 +- .../consumer/extractors/generic/run.py | 46 ++- .../consumer/extractors/impl/airflow_task.py | 36 +- data_rentgen/consumer/extractors/impl/hive.py | 12 +- data_rentgen/consumer/saver.py | 10 + ...6_4e119cb7481e_add_job_dependency_table.py | 50 +++ data_rentgen/db/models/__init__.py | 2 + data_rentgen/db/models/job_dependency.py | 47 +++ .../db/repositories/job_dependency.py | 76 ++++ data_rentgen/db/scripts/seed/dbt.py | 16 +- data_rentgen/db/scripts/seed/spark_yarn.py | 23 +- data_rentgen/dto/__init__.py | 2 + data_rentgen/dto/job_dependency.py | 26 ++ data_rentgen/dto/run.py | 11 + .../openlineage/run_facets/__init__.py | 11 + .../openlineage/run_facets/airflow.py | 23 ++ .../openlineage/run_facets/job_dependency.py | 62 ++++ data_rentgen/services/uow.py | 2 + docs/changelog/next_release/402.feature.rst | 6 + docs/reference/database/structure.rst | 15 +- .../test_extractors_run_airflow.py | 324 ++++++++++++++++++ .../test_runs_handler_airflow.py | 16 + .../test_handlers/test_runs_handler_dbt.py | 8 +- .../test_handlers/test_runs_handler_flink.py | 6 + .../test_handlers/test_runs_handler_hive.py | 8 +- .../test_handlers/test_runs_handler_spark.py | 8 +- .../test_runs_handler_unknown.py | 6 + 28 files changed, 852 insertions(+), 31 deletions(-) create mode 100644 data_rentgen/db/migrations/versions/2026-03-06_4e119cb7481e_add_job_dependency_table.py create mode 100644 data_rentgen/db/models/job_dependency.py create mode 100644 data_rentgen/db/repositories/job_dependency.py create mode 100644 data_rentgen/dto/job_dependency.py create mode 100644 data_rentgen/openlineage/run_facets/job_dependency.py create mode 100644 docs/changelog/next_release/402.feature.rst diff --git a/data_rentgen/consumer/extractors/batch_extraction_result.py b/data_rentgen/consumer/extractors/batch_extraction_result.py index 76b1c6c2..1b1d7ecf 100644 --- a/data_rentgen/consumer/extractors/batch_extraction_result.py +++ b/data_rentgen/consumer/extractors/batch_extraction_result.py @@ -10,6 +10,7 @@ DatasetDTO, DatasetSymlinkDTO, InputDTO, + JobDependencyDTO, JobDTO, JobTypeDTO, LocationDTO, @@ -29,6 +30,7 @@ DatasetDTO, DatasetSymlinkDTO, InputDTO, + JobDependencyDTO, JobDTO, JobTypeDTO, LocationDTO, @@ -67,6 +69,7 @@ def __init__(self): self._dataset_symlinks: dict[tuple, DatasetSymlinkDTO] = {} self._job_types: dict[tuple, JobTypeDTO] = {} self._jobs: dict[tuple, JobDTO] = {} + self._job_dependencies: dict[tuple, JobDependencyDTO] = {} self._runs: dict[tuple, RunDTO] = {} self._operations: dict[tuple, OperationDTO] = {} self._inputs: dict[tuple, InputDTO] = {} @@ -86,6 +89,7 @@ def __repr__(self): f"dataset_symlinks={len(self._dataset_symlinks)}, " f"job_types={len(self._job_types)}, " f"jobs={len(self._jobs)}, " + f"job_dependencies={len(self._job_dependencies)}, " f"runs={len(self._runs)}, " f"operations={len(self._operations)}, " f"inputs={len(self._inputs)}, " @@ -139,12 +143,19 @@ def add_job(self, job: JobDTO): job.tag_values = {self.add_tag_value(tag_value) for tag_value in job.tag_values} return self._add(self._jobs, job) + def add_job_dependency(self, job_dependency: JobDependencyDTO): + job_dependency.from_job = self.add_job(job_dependency.from_job) + job_dependency.to_job = self.add_job(job_dependency.to_job) + return self._add(self._job_dependencies, job_dependency) + def add_run(self, run: RunDTO): run.job = self.add_job(run.job) if run.parent_run: run.parent_run = self.add_run(run.parent_run) if run.user: run.user = self.add_user(run.user) + for job_dependency in run.job_dependencies: + self.add_job_dependency(job_dependency) return self._add(self._runs, run) def add_operation(self, operation: OperationDTO): @@ -232,6 +243,12 @@ def get_job(self, job_key: tuple) -> JobDTO: job.tag_values = {self.get_tag_value(tag_value.unique_key) for tag_value in job.tag_values} return job + def get_job_dependency(self, job_dependency_key: tuple) -> JobDependencyDTO: + job_dependency = self._job_dependencies[job_dependency_key] + job_dependency.from_job = self.get_job(job_dependency.from_job.unique_key) + job_dependency.to_job = self.get_job(job_dependency.to_job.unique_key) + return job_dependency + def get_run(self, run_key: tuple) -> RunDTO: run = self._runs[run_key] run.job = self.get_job(run.job.unique_key) @@ -290,6 +307,9 @@ def job_types(self) -> list[JobTypeDTO]: def jobs(self) -> list[JobDTO]: return self._resolve(self.get_job, self._jobs) + def job_dependencies(self) -> list[JobDependencyDTO]: + return self._resolve(self.get_job_dependency, self._job_dependencies) + def runs(self) -> list[RunDTO]: return self._resolve(self.get_run, self._runs) diff --git a/data_rentgen/consumer/extractors/generic/job.py b/data_rentgen/consumer/extractors/generic/job.py index 999a5f0b..dd719d7a 100644 --- a/data_rentgen/consumer/extractors/generic/job.py +++ b/data_rentgen/consumer/extractors/generic/job.py @@ -13,6 +13,7 @@ ) from data_rentgen.openlineage.job import OpenLineageJob from data_rentgen.openlineage.run_facets import ( + OpenLineageJobIdentifier, OpenLineageParentJob, ) @@ -29,7 +30,10 @@ def extract_job(self, job: OpenLineageJob) -> JobDTO: ) return self._enrich_job_tags(job_dto, job) - def extract_parent_job(self, job: OpenLineageJob | OpenLineageParentJob) -> JobDTO: + def extract_pure_job( + self, + job: OpenLineageJob | OpenLineageParentJob | OpenLineageJobIdentifier, + ) -> JobDTO: """ Extract JobDTO from parent job reference """ @@ -38,7 +42,10 @@ def extract_parent_job(self, job: OpenLineageJob | OpenLineageParentJob) -> JobD location=self._extract_job_location(job), ) - def _extract_job_location(self, job: OpenLineageJob | OpenLineageParentJob) -> LocationDTO: + def _extract_job_location( + self, + job: OpenLineageJob | OpenLineageParentJob | OpenLineageJobIdentifier, + ) -> LocationDTO: # hostname and scheme are normalized to lowercase for uniqueness url = urlparse(job.namespace.lower()) scheme = url.scheme or "unknown" diff --git a/data_rentgen/consumer/extractors/generic/run.py b/data_rentgen/consumer/extractors/generic/run.py index 9d6e9979..e0deca7f 100644 --- a/data_rentgen/consumer/extractors/generic/run.py +++ b/data_rentgen/consumer/extractors/generic/run.py @@ -5,6 +5,7 @@ from abc import ABC, abstractmethod from data_rentgen.dto import ( + JobDependencyDTO, JobDTO, RunDTO, RunStatusDTO, @@ -17,10 +18,11 @@ OpenLineageRunEventType, ) from data_rentgen.openlineage.run_facets import ( + OpenLineageJobIdentifier, OpenLineageParentJob, OpenLineageParentRunFacet, + OpenLineageRunTagsFacetField, ) -from data_rentgen.openlineage.run_facets.run_tags import OpenLineageRunTagsFacetField class RunExtractorMixin(ABC): @@ -29,18 +31,14 @@ def extract_job(self, job: OpenLineageJob) -> JobDTO: pass @abstractmethod - def extract_parent_job(self, job: OpenLineageJob | OpenLineageParentJob) -> JobDTO: + def extract_pure_job(self, job: OpenLineageJob | OpenLineageParentJob | OpenLineageJobIdentifier) -> JobDTO: pass def extract_run(self, event: OpenLineageRunEvent) -> RunDTO: """ Extract RunDTO from specific event """ - run = RunDTO( - id=event.run.runId, # type: ignore [arg-type] - job=self.extract_job(event.job), - parent_run=self.extract_parent_run(event.run.facets.parent) if event.run.facets.parent else None, - ) + run = self.extract_pure_run(event) if run.parent_run: run.job.parent_job = run.parent_run.job self._enrich_run_status(run, event) @@ -49,15 +47,23 @@ def extract_run(self, event: OpenLineageRunEvent) -> RunDTO: self._add_openlineage_client_version_tag(run, event) self._enrich_run_tags(run, event) self._enrich_nominal_times(run, event) + self._enrich_job_dependencies(run, event) return run + def extract_pure_run(self, event: OpenLineageRunEvent) -> RunDTO: + return RunDTO( + id=event.run.runId, # type: ignore [arg-type] + job=self.extract_job(event.job), + parent_run=self.extract_parent_run(event.run.facets.parent) if event.run.facets.parent else None, + ) + def extract_parent_run(self, facet: OpenLineageParentRunFacet | OpenLineageRunEvent) -> RunDTO: """ Extract RunDTO from parent run reference """ return RunDTO( id=facet.run.runId, - job=self.extract_parent_job(facet.job), + job=self.extract_pure_job(facet.job), ) def _enrich_run_status(self, run: RunDTO, event: OpenLineageRunEvent) -> RunDTO: @@ -146,3 +152,27 @@ def _enrich_nominal_times(self, run: RunDTO, event: OpenLineageRunEvent) -> RunD run.expected_end_at = None return run + + def _enrich_job_dependencies(self, run: RunDTO, event: OpenLineageRunEvent) -> RunDTO: + if not event.run.facets.jobDependencies: + return run + + for upstream in event.run.facets.jobDependencies.upstream: + run.job_dependencies.append( + JobDependencyDTO( + from_job=self.extract_pure_job(upstream.job), + to_job=run.job, + type=upstream.dependency_type, + ), + ) + + for downstream in event.run.facets.jobDependencies.downstream: + run.job_dependencies.append( + JobDependencyDTO( + from_job=run.job, + to_job=self.extract_pure_job(downstream.job), + type=downstream.dependency_type, + ), + ) + + return run diff --git a/data_rentgen/consumer/extractors/impl/airflow_task.py b/data_rentgen/consumer/extractors/impl/airflow_task.py index 8a5f99d4..d9f04b08 100644 --- a/data_rentgen/consumer/extractors/impl/airflow_task.py +++ b/data_rentgen/consumer/extractors/impl/airflow_task.py @@ -8,7 +8,8 @@ from data_rentgen.consumer.extractors.generic import GenericExtractor from data_rentgen.consumer.extractors.impl.utils import parse_kv_tag -from data_rentgen.dto import JobDTO, OperationDTO, RunDTO, RunStartReasonDTO, UserDTO +from data_rentgen.dto import JobDTO, JobTypeDTO, OperationDTO, RunDTO, RunStartReasonDTO, UserDTO +from data_rentgen.dto.job_dependency import JobDependencyDTO from data_rentgen.openlineage.job import OpenLineageJob from data_rentgen.openlineage.job_facets import OpenLineageJobTagsFacetField from data_rentgen.openlineage.run_event import OpenLineageRunEvent @@ -211,3 +212,36 @@ def _enrich_run_tags(self, run: RunDTO, event: OpenLineageRunEvent) -> RunDTO: # facets are immutable object.__setattr__(event.run.facets, "tags", OpenLineageRunTagsFacet(tags=run_tags)) return super()._enrich_run_tags(run, event) + + def _enrich_job_dependencies(self, run: RunDTO, event: OpenLineageRunEvent) -> RunDTO: + run = super()._enrich_job_dependencies(run, event) + if event.run.facets.airflow: + # https://github.com/apache/airflow/pull/59521 brings up jobDependency facet, + # but it still doesn't contain direct task -> task dependencies + dag_info = event.run.facets.airflow.dag + task_info = event.run.facets.airflow.task + for upstream_task_id in task_info.upstream_task_ids: + run.job_dependencies.append( + JobDependencyDTO( + from_job=JobDTO( + name=f"{dag_info.dag_id}.{upstream_task_id}", + location=run.job.location, + type=JobTypeDTO("AIRFLOW_TASK"), + ), + to_job=run.job, + type="DIRECT_DEPENDENCY", + ), + ) + for downstream_task_id in task_info.downstream_task_ids: + run.job_dependencies.append( + JobDependencyDTO( + from_job=run.job, + to_job=JobDTO( + name=f"{dag_info.dag_id}.{downstream_task_id}", + location=run.job.location, + type=JobTypeDTO("AIRFLOW_TASK"), + ), + type="DIRECT_DEPENDENCY", + ), + ) + return run diff --git a/data_rentgen/consumer/extractors/impl/hive.py b/data_rentgen/consumer/extractors/impl/hive.py index d1ebca87..6f196d1b 100644 --- a/data_rentgen/consumer/extractors/impl/hive.py +++ b/data_rentgen/consumer/extractors/impl/hive.py @@ -34,7 +34,7 @@ def is_operation(self, event: OpenLineageRunEvent) -> bool: # All events produced by Hive integration are queries == operations return True - def extract_run(self, event: OpenLineageRunEvent) -> RunDTO: + def extract_pure_run(self, event: OpenLineageRunEvent) -> RunDTO: # Hive produce only hive query events, but no events for hive session: # https://github.com/OpenLineage/OpenLineage/issues/3784 # But we treat queries as operations, and operations should be bound to run (session) for grouping. @@ -55,7 +55,7 @@ def extract_run(self, event: OpenLineageRunEvent) -> RunDTO: if hive_session.username not in ("anonymous", "hive"): user = UserDTO(name=hive_session.username) - run = RunDTO( + return RunDTO( id=run_id, job=JobDTO( name=job_name, @@ -68,12 +68,8 @@ def extract_run(self, event: OpenLineageRunEvent) -> RunDTO: external_id=hive_session.sessionId, user=user, ) - if run.parent_run: - run.job.parent_job = run.parent_run.job - self._add_engine_version_tag(run, event) - self._add_openlineage_adapter_version_tag(run, event) - self._add_openlineage_client_version_tag(run, event) - self._enrich_run_tags(run, event) + + def _enrich_run_status(self, run: RunDTO, event: OpenLineageRunEvent): return run def extract_operation(self, event: OpenLineageRunEvent) -> OperationDTO: diff --git a/data_rentgen/consumer/saver.py b/data_rentgen/consumer/saver.py index f50be125..de09c65d 100644 --- a/data_rentgen/consumer/saver.py +++ b/data_rentgen/consumer/saver.py @@ -32,6 +32,7 @@ async def save(self, data: BatchExtractionResult): await self.create_dataset_symlinks(data) await self.create_job_types(data) await self.create_jobs(data) + await self.create_job_dependencies(data) await self.create_users(data) await self.create_sql_queries(data) await self.create_schemas(data) @@ -113,6 +114,15 @@ async def create_jobs(self, data: BatchExtractionResult): job = await self.unit_of_work.job.update(job, job_dto) # noqa: PLW2901 job_dto.id = job.id + async def create_job_dependencies(self, data: BatchExtractionResult): + self.logger.debug("Creating job dependencies") + job_dependency_pairs = await self.unit_of_work.job_dependency.fetch_bulk(data.job_dependencies()) + for job_dependency_dto, job_dependency in job_dependency_pairs: + if not job_dependency: + async with self.unit_of_work: + job_dependency = await self.unit_of_work.job_dependency.create(job_dependency_dto) # noqa: PLW2901 + job_dependency_dto.id = job_dependency.id + async def create_users(self, data: BatchExtractionResult): self.logger.debug("Creating users") user_pairs = await self.unit_of_work.user.fetch_bulk(data.users()) diff --git a/data_rentgen/db/migrations/versions/2026-03-06_4e119cb7481e_add_job_dependency_table.py b/data_rentgen/db/migrations/versions/2026-03-06_4e119cb7481e_add_job_dependency_table.py new file mode 100644 index 00000000..01c83220 --- /dev/null +++ b/data_rentgen/db/migrations/versions/2026-03-06_4e119cb7481e_add_job_dependency_table.py @@ -0,0 +1,50 @@ +# SPDX-FileCopyrightText: 2024-present MTS PJSC +# SPDX-License-Identifier: Apache-2.0 +"""Add job_dependency table + +Revision ID: 4e119cb7481e +Revises: a1950f06a8cb +Create Date: 2026-03-06 17:39:31.296534 + +""" + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "4e119cb7481e" +down_revision = "a1950f06a8cb" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.create_table( + "job_dependency", + sa.Column("id", sa.BigInteger(), nullable=False), + sa.Column("from_job_id", sa.BigInteger(), nullable=False), + sa.Column("to_job_id", sa.BigInteger(), nullable=False), + sa.Column("type", sa.String(), nullable=True), + sa.ForeignKeyConstraint( + ["from_job_id"], + ["job.id"], + name=op.f("fk__job_dependency__from_job_id__job"), + ondelete="CASCADE", + ), + sa.ForeignKeyConstraint( + ["to_job_id"], + ["job.id"], + name=op.f("fk__job_dependency__to_job_id__job"), + ondelete="CASCADE", + ), + sa.PrimaryKeyConstraint("id", name=op.f("pk__job_dependency")), + sa.UniqueConstraint("from_job_id", "to_job_id", name=op.f("uq__job_dependency__from_job_id_to_job_id")), + ) + op.create_index(op.f("ix__job_dependency__from_job_id"), "job_dependency", ["from_job_id"], unique=False) + op.create_index(op.f("ix__job_dependency__to_job_id"), "job_dependency", ["to_job_id"], unique=False) + + +def downgrade() -> None: + op.drop_index(op.f("ix__job_dependency__to_job_id"), table_name="job_dependency") + op.drop_index(op.f("ix__job_dependency__from_job_id"), table_name="job_dependency") + op.drop_table("job_dependency") diff --git a/data_rentgen/db/models/__init__.py b/data_rentgen/db/models/__init__.py index 4a2bbe21..091963e5 100644 --- a/data_rentgen/db/models/__init__.py +++ b/data_rentgen/db/models/__init__.py @@ -14,6 +14,7 @@ from data_rentgen.db.models.dataset_symlink import DatasetSymlink, DatasetSymlinkType from data_rentgen.db.models.input import Input from data_rentgen.db.models.job import Job, JobTagValue +from data_rentgen.db.models.job_dependency import JobDependency from data_rentgen.db.models.job_type import JobType from data_rentgen.db.models.location import Location from data_rentgen.db.models.operation import Operation, OperationStatus, OperationType @@ -40,6 +41,7 @@ "DatasetTagValue", "Input", "Job", + "JobDependency", "JobLastRun", "JobTagValue", "JobType", diff --git a/data_rentgen/db/models/job_dependency.py b/data_rentgen/db/models/job_dependency.py new file mode 100644 index 00000000..f79d67b5 --- /dev/null +++ b/data_rentgen/db/models/job_dependency.py @@ -0,0 +1,47 @@ +# SPDX-FileCopyrightText: 2024-present MTS PJSC +# SPDX-License-Identifier: Apache-2.0 + + +from sqlalchemy import BigInteger, ForeignKey, String, UniqueConstraint +from sqlalchemy.orm import Mapped, mapped_column, relationship + +from data_rentgen.db.models.base import Base +from data_rentgen.db.models.job import Job + + +class JobDependency(Base): + __tablename__ = "job_dependency" + __table_args__ = (UniqueConstraint("from_job_id", "to_job_id"),) + + id: Mapped[int] = mapped_column(BigInteger, primary_key=True) + from_job_id: Mapped[int] = mapped_column( + BigInteger, + ForeignKey("job.id", ondelete="CASCADE"), + index=True, + nullable=False, + doc="From job id", + ) + from_job: Mapped[Job] = relationship( + Job, + lazy="noload", + foreign_keys=[from_job_id], + ) + + to_job_id: Mapped[int] = mapped_column( + BigInteger, + ForeignKey("job.id", ondelete="CASCADE"), + index=True, + nullable=False, + doc="To job id", + ) + to_job: Mapped[Job] = relationship( + Job, + lazy="noload", + foreign_keys=[to_job_id], + ) + + type: Mapped[str] = mapped_column( + String, + nullable=True, + doc="Dependency type", + ) diff --git a/data_rentgen/db/repositories/job_dependency.py b/data_rentgen/db/repositories/job_dependency.py new file mode 100644 index 00000000..74a6cbc5 --- /dev/null +++ b/data_rentgen/db/repositories/job_dependency.py @@ -0,0 +1,76 @@ +# SPDX-FileCopyrightText: 2024-present MTS PJSC +# SPDX-License-Identifier: Apache-2.0 + + +from sqlalchemy import ARRAY, Integer, bindparam, cast, func, select, tuple_ + +from data_rentgen.db.models.job_dependency import JobDependency +from data_rentgen.db.repositories.base import Repository +from data_rentgen.dto import JobDependencyDTO + +fetch_bulk_query = select(JobDependency).where( + tuple_(JobDependency.from_job_id, JobDependency.to_job_id).in_( + select( + func.unnest( + cast(bindparam("from_job_ids"), ARRAY(Integer())), + cast(bindparam("to_job_ids"), ARRAY(Integer())), + ) + .table_valued("from_job_ids", "to_job_ids") + .render_derived(), + ), + ), +) + +get_one_query = select(JobDependency).where( + JobDependency.from_job_id == bindparam("from_job_id"), + JobDependency.to_job_id == bindparam("to_job_id"), +) + + +class JobDependencyRepository(Repository[JobDependency]): + async def fetch_bulk( + self, + job_dependencies_dto: list[JobDependencyDTO], + ) -> list[tuple[JobDependencyDTO, JobDependency | None]]: + if not job_dependencies_dto: + return [] + + scalars = await self._session.scalars( + fetch_bulk_query, + { + "from_job_ids": [item.from_job.id for item in job_dependencies_dto], + "to_job_ids": [item.to_job.id for item in job_dependencies_dto], + }, + ) + existing = {(item.from_job_id, item.to_job_id): item for item in scalars.all()} + return [ + ( + dto, + existing.get((dto.from_job.id, dto.to_job.id)), # type: ignore[arg-type] + ) + for dto in job_dependencies_dto + ] + + async def create(self, job_dependency: JobDependencyDTO) -> JobDependency: + # if another worker already created the same row, just use it. if not - create with holding the lock. + await self._lock(job_dependency.from_job.id, job_dependency.to_job.id) + return await self._get(job_dependency) or await self._create(job_dependency) + + async def _get(self, job_dependency: JobDependencyDTO) -> JobDependency | None: + return await self._session.scalar( + get_one_query, + { + "from_job_id": job_dependency.from_job.id, + "to_job_id": job_dependency.to_job.id, + }, + ) + + async def _create(self, job_dependency: JobDependencyDTO) -> JobDependency: + result = JobDependency( + from_job_id=job_dependency.from_job.id, + to_job_id=job_dependency.to_job.id, + type=job_dependency.type, + ) + self._session.add(result) + await self._session.flush([result]) + return result diff --git a/data_rentgen/db/scripts/seed/dbt.py b/data_rentgen/db/scripts/seed/dbt.py index e28d78f2..0d5631a2 100644 --- a/data_rentgen/db/scripts/seed/dbt.py +++ b/data_rentgen/db/scripts/seed/dbt.py @@ -9,6 +9,7 @@ from faker import Faker from data_rentgen.consumer.extractors import BatchExtractionResult +from data_rentgen.db.scripts.seed.airflow import generate_airflow_run from data_rentgen.dto import ( ColumnLineageDTO, DatasetColumnRelationDTO, @@ -92,10 +93,21 @@ def generate_dbt_run( start: datetime, end: datetime, ) -> BatchExtractionResult: + run_created_at = faker.date_time_between(start, end, tzinfo=UTC) + run_started_at = run_created_at + timedelta(minutes=faker.pyfloat(min_value=0, max_value=3)) + run_ended_at = run_started_at + timedelta(minutes=faker.pyfloat(min_value=30, max_value=35)) + parent_run = generate_airflow_run( + "mart_layer_dag", + "mart_layer_task_dbt", + run_created_at - timedelta(seconds=faker.pyint(min_value=5, max_value=10)), + run_ended_at + timedelta(seconds=faker.pyint(min_value=5, max_value=10)), + ) + job = JobDTO( name="dbt-run-user_metrics", location=LOCATIONS["local"], type=JobTypeDTO(type="DBT_JOB"), + parent_job=parent_run.job, tag_values={ TagValueDTO( tag=TagDTO(name="dbt.version"), @@ -116,13 +128,11 @@ def generate_dbt_run( }, ) - run_created_at = faker.date_time_between(start, end, tzinfo=UTC) - run_started_at = run_created_at + timedelta(minutes=faker.pyfloat(min_value=0, max_value=3)) - run_ended_at = run_started_at + timedelta(minutes=faker.pyfloat(min_value=30, max_value=35)) run_id = generate_new_uuid(run_created_at) run = RunDTO( id=run_id, job=job, + parent_run=parent_run, status=RunStatusDTO.SUCCEEDED, external_id=str(uuid4()), started_at=run_started_at, diff --git a/data_rentgen/db/scripts/seed/spark_yarn.py b/data_rentgen/db/scripts/seed/spark_yarn.py index 2244128c..37381e5d 100644 --- a/data_rentgen/db/scripts/seed/spark_yarn.py +++ b/data_rentgen/db/scripts/seed/spark_yarn.py @@ -17,6 +17,7 @@ DatasetSymlinkDTO, DatasetSymlinkTypeDTO, InputDTO, + JobDependencyDTO, JobDTO, JobTypeDTO, LocationDTO, @@ -179,9 +180,9 @@ def generate_spark_run_yarn( run_created_at = faker.date_time_between(start, end, tzinfo=UTC) run_started_at = run_created_at + timedelta(minutes=faker.pyfloat(min_value=0, max_value=3)) run_ended_at = run_started_at + timedelta(minutes=faker.pyfloat(min_value=10, max_value=12)) - paren_run = generate_airflow_run( + parent_run = generate_airflow_run( "mart_layer_dag", - "mart_layer_task", + "mart_layer_task_spark", run_created_at - timedelta(seconds=faker.pyint(min_value=5, max_value=10)), run_ended_at + timedelta(seconds=faker.pyint(min_value=5, max_value=10)), ) @@ -190,7 +191,7 @@ def generate_spark_run_yarn( name="mart_layer_loader", location=LOCATIONS["yarn"], type=JobTypeDTO(type="SPARK_APPLICATION"), - parent_job=paren_run.job, + parent_job=parent_run.job, tag_values={ TagValueDTO( tag=TagDTO(name="spark.version"), @@ -207,13 +208,27 @@ def generate_spark_run_yarn( }, ) + # add Airflow Task mart_layer_dag.mart_layer_dbt -> mart_layer_dag.mart_layer_task_spark dependency + parent_run.job_dependencies.append( + JobDependencyDTO( + from_job=JobDTO( + name="mart_layer_dag.mart_layer_dbt", + parent_job=parent_run.job.parent_job, # DAG + location=parent_run.job.location, + type=JobTypeDTO(type="AIRFLOW_TASK"), + ), + to_job=job, + type="DIRECT_DEPENDENCY", + ) + ) + run_id = generate_new_uuid(run_created_at) external_id = f"application_{run_created_at.timestamp() * 1000}_0001" run = RunDTO( id=run_id, job=job, status=RunStatusDTO.SUCCEEDED, - parent_run=paren_run, + parent_run=parent_run, external_id=external_id, running_log_url=f"http://{faker.ipv4_private()}:{faker.port_number(is_user=True)}", persistent_log_url=f"http://mn01.hadoop.companyname.com:8088/proxy/{external_id}", diff --git a/data_rentgen/dto/__init__.py b/data_rentgen/dto/__init__.py index 3cc5a8be..74ae98c5 100644 --- a/data_rentgen/dto/__init__.py +++ b/data_rentgen/dto/__init__.py @@ -10,6 +10,7 @@ from data_rentgen.dto.dataset_symlink import DatasetSymlinkDTO, DatasetSymlinkTypeDTO from data_rentgen.dto.input import InputDTO from data_rentgen.dto.job import JobDTO +from data_rentgen.dto.job_dependency import JobDependencyDTO from data_rentgen.dto.job_type import JobTypeDTO from data_rentgen.dto.location import LocationDTO from data_rentgen.dto.operation import ( @@ -34,6 +35,7 @@ "DatasetSymlinkTypeDTO", "InputDTO", "JobDTO", + "JobDependencyDTO", "JobTypeDTO", "LocationDTO", "OperationDTO", diff --git a/data_rentgen/dto/job_dependency.py b/data_rentgen/dto/job_dependency.py new file mode 100644 index 00000000..aed6fa4c --- /dev/null +++ b/data_rentgen/dto/job_dependency.py @@ -0,0 +1,26 @@ +# SPDX-FileCopyrightText: 2024-present MTS PJSC +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +from dataclasses import dataclass, field + +from data_rentgen.dto.job import JobDTO + + +@dataclass(slots=True) +class JobDependencyDTO: + from_job: JobDTO + to_job: JobDTO + type: str | None = None + id: int | None = field(default=None, compare=False) + + @property + def unique_key(self) -> tuple: + return (self.from_job.unique_key, self.to_job.unique_key, self.type) + + def merge(self, new: JobDependencyDTO) -> JobDependencyDTO: + self.from_job = self.from_job.merge(new.from_job) + self.to_job = self.to_job.merge(new.to_job) + self.id = new.id or self.id + return self diff --git a/data_rentgen/dto/run.py b/data_rentgen/dto/run.py index c89573fc..45a73d25 100644 --- a/data_rentgen/dto/run.py +++ b/data_rentgen/dto/run.py @@ -9,6 +9,7 @@ from uuid import UUID from data_rentgen.dto.job import JobDTO +from data_rentgen.dto.job_dependency import JobDependencyDTO from data_rentgen.dto.user import UserDTO from data_rentgen.utils.uuid import extract_timestamp_from_uuid @@ -46,6 +47,7 @@ class RunDTO: persistent_log_url: str | None = None expected_start_at: datetime | None = None expected_end_at: datetime | None = None + job_dependencies: list[JobDependencyDTO] = field(default_factory=list) def __post_init__(self): self.created_at = extract_timestamp_from_uuid(self.id) @@ -66,6 +68,15 @@ def merge(self, new: RunDTO) -> RunDTO: else: self.user = new.user or self.user + existing_dependencies = {item.unique_key: item for item in self.job_dependencies} + merged_dependencies = [] + for job_dependency in new.job_dependencies: + if job_dependency.unique_key in existing_dependencies: + merged_dependencies.append(existing_dependencies[job_dependency.unique_key].merge(job_dependency)) + else: + merged_dependencies.append(job_dependency) + self.job_dependencies = merged_dependencies + self.status = max(new.status, self.status) self.started_at = new.started_at or self.started_at self.start_reason = new.start_reason or self.start_reason diff --git a/data_rentgen/openlineage/run_facets/__init__.py b/data_rentgen/openlineage/run_facets/__init__.py index 482fd143..d5b0dc35 100644 --- a/data_rentgen/openlineage/run_facets/__init__.py +++ b/data_rentgen/openlineage/run_facets/__init__.py @@ -21,6 +21,12 @@ ) from data_rentgen.openlineage.run_facets.hive_query import OpenLineageHiveQueryInfoRunFacet from data_rentgen.openlineage.run_facets.hive_session import OpenLineageHiveSessionInfoRunFacet +from data_rentgen.openlineage.run_facets.job_dependency import ( + OpenLineageJobDependenciesRunFacet, + OpenLineageJobDependency, + OpenLineageJobIdentifier, + OpenLineageRunIdentifier, +) from data_rentgen.openlineage.run_facets.nominal_time import OpenLineageNominalTimeRunFacet from data_rentgen.openlineage.run_facets.parent_run import ( OpenLineageParentJob, @@ -52,6 +58,9 @@ "OpenLineageAirflowTaskRunFacet", "OpenLineageDbtRunRunFacet", "OpenLineageFlinkJobDetailsRunFacet", + "OpenLineageJobDependenciesRunFacet", + "OpenLineageJobDependency", + "OpenLineageJobIdentifier", "OpenLineageNominalTimeRunFacet", "OpenLineageParentJob", "OpenLineageParentRun", @@ -59,6 +68,7 @@ "OpenLineageProcessingEngineRunFacet", "OpenLineageRunFacet", "OpenLineageRunFacets", + "OpenLineageRunIdentifier", "OpenLineageRunTagsFacet", "OpenLineageRunTagsFacetField", "OpenLineageSparkApplicationDetailsRunFacet", @@ -86,3 +96,4 @@ class OpenLineageRunFacets(OpenLineageBase): hive_query: OpenLineageHiveQueryInfoRunFacet | None = None hive_session: OpenLineageHiveSessionInfoRunFacet | None = None nominalTime: OpenLineageNominalTimeRunFacet | None = None + jobDependencies: OpenLineageJobDependenciesRunFacet | None = None diff --git a/data_rentgen/openlineage/run_facets/airflow.py b/data_rentgen/openlineage/run_facets/airflow.py index 641521af..472b571b 100644 --- a/data_rentgen/openlineage/run_facets/airflow.py +++ b/data_rentgen/openlineage/run_facets/airflow.py @@ -81,6 +81,29 @@ class OpenLineageAirflowTaskInfo(OpenLineageBase): task_id: str = Field(examples=["my_task"]) operator_class: str | None = Field(default=None, examples=["MyOperator"]) task_group: OpenLineageAirflowTaskGroupInfo | None = None + downstream_task_ids: list[str] = Field(default_factory=list, examples=["downstream_task1", "downstream_task2"]) + upstream_task_ids: list[str] = Field(default_factory=list, examples=["upstream_task1", "upstream_task2"]) + + @field_validator("downstream_task_ids", "upstream_task_ids", mode="before") + @classmethod + def _validate_task_ids(cls, value: list[str] | str) -> list[str]: + """ + + At different moments time tags could be passed in formats: + * ``["task1", "task2"]`` - as expected by JSON spec + * ``"['task1', 'task2']"`` - `str(list)` representation + * ``'["task1", "task2"]'`` - `str(list)` representation if tag contains single quotes + + See: + * https://github.com/apache/airflow/pull/40371 + * https://github.com/apache/airflow/pull/40854 + * https://github.com/apache/airflow/pull/41786 + """ + if isinstance(value, list): + return value + with suppress(SyntaxError): + return ast.literal_eval(value) + return [] class OpenLineageAirflowTaskInstanceInfo(OpenLineageBase): diff --git a/data_rentgen/openlineage/run_facets/job_dependency.py b/data_rentgen/openlineage/run_facets/job_dependency.py new file mode 100644 index 00000000..94f08d4a --- /dev/null +++ b/data_rentgen/openlineage/run_facets/job_dependency.py @@ -0,0 +1,62 @@ +# SPDX-FileCopyrightText: 2024-present MTS PJSC +# SPDX-License-Identifier: Apache-2.0 + +from pydantic import UUID7, Field + +from data_rentgen.openlineage.base import OpenLineageBase +from data_rentgen.openlineage.run_facets.base import OpenLineageRunFacet + + +class OpenLineageJobIdentifier(OpenLineageBase): + """Job identifier. + See [JobDependenciesRunFacet](https://github.com/OpenLineage/OpenLineage/blob/main/spec/facets/JobDependenciesRunFacet.json). + """ + + namespace: str = Field(examples=["http://my-airflow.domain.com:8081"], json_schema_extra={"format": "uri"}) + name: str = Field(examples=["my_dag.my_task"]) + + +class OpenLineageRunIdentifier(OpenLineageBase): + """Run identifier. + See [JobDependenciesRunFacet](https://github.com/OpenLineage/OpenLineage/blob/main/spec/facets/JobDependenciesRunFacet.json). + """ + + runId: UUID7 = Field(examples=["019867d4-1b59-71fe-bc30-3fbd38703700"]) + + +class OpenLineageJobDependency(OpenLineageRunFacet): + """Job dependency item. + See [JobDependenciesRunFacet](https://github.com/OpenLineage/OpenLineage/blob/main/spec/facets/JobDependenciesRunFacet.json). + """ + + job: OpenLineageJobIdentifier + run: OpenLineageRunIdentifier | None = None + dependency_type: str | None = Field( + default=None, + examples=["DIRECT_INVOCATION", "IMPLICIT_DEPENDENCY"], + description="Implementation specific", + ) + sequence_trigger_rule: str | None = Field( + default=None, + examples=["FINISH_TO_START", "FINISH_TO_FINISH", "START_TO_START"], + description="Currently ignored", + ) + status_trigger_rule: str | None = Field( + default=None, + examples=["EXECUTE_EVERY_TIME", "EXECUTE_ON_SUCCESS", "EXECUTE_ON_FAILURE"], + description="Currently ignored", + ) + + +class OpenLineageJobDependenciesRunFacet(OpenLineageRunFacet): + """Run facet describing job dependencies. + See [JobDependenciesRunFacet](https://github.com/OpenLineage/OpenLineage/blob/main/spec/facets/JobDependenciesRunFacet.json). + """ + + upstream: list[OpenLineageJobDependency] = Field(default_factory=list) + downstream: list[OpenLineageJobDependency] = Field(default_factory=list) + trigger_rule: str | None = Field( + default=None, + examples=["ALL_SUCCESS", "ALL_DONE", "ONE_SUCCESS", "NONE_FAILED"], + description="Currently ignored", + ) diff --git a/data_rentgen/services/uow.py b/data_rentgen/services/uow.py index 58ea4fdd..ccac9c42 100644 --- a/data_rentgen/services/uow.py +++ b/data_rentgen/services/uow.py @@ -15,6 +15,7 @@ from data_rentgen.db.repositories.input import InputRepository from data_rentgen.db.repositories.io_dataset_relation import IODatasetRelationRepository from data_rentgen.db.repositories.job import JobRepository +from data_rentgen.db.repositories.job_dependency import JobDependencyRepository from data_rentgen.db.repositories.job_type import JobTypeRepository from data_rentgen.db.repositories.location import LocationRepository from data_rentgen.db.repositories.operation import OperationRepository @@ -38,6 +39,7 @@ def __init__( self.location = LocationRepository(session) self.job_type = JobTypeRepository(session) self.job = JobRepository(session) + self.job_dependency = JobDependencyRepository(session) self.run = RunRepository(session) self.operation = OperationRepository(session) self.dataset = DatasetRepository(session) diff --git a/docs/changelog/next_release/402.feature.rst b/docs/changelog/next_release/402.feature.rst new file mode 100644 index 00000000..6c50b46f --- /dev/null +++ b/docs/changelog/next_release/402.feature.rst @@ -0,0 +1,6 @@ +Extract information from `jobDependencies `_ facet, and store it in ``job_dependency`` table. +For now this is just a simple tuple `from_dataset_id`, `to_dataset_id, `type` (arbitrary string provided by integration, not enum). This can be changed in future versions of Data.Rentgen. + +Currently the only integration providing this kind of information is Airflow. But it is implemented only in most recent version of OpenLineage provider for Airflow (`2.10 or higher `_). +For now provides also doesn't send facet with information about direct task -> task dependencies - only indirect ones are included (declared via `Asset `_). +So there is a fallback for Airflow which extracts these dependencies from ``downstream_task_ids`` and ``upstream_task_ids`` task fields. diff --git a/docs/reference/database/structure.rst b/docs/reference/database/structure.rst index d021cfd9..f04c6215 100644 --- a/docs/reference/database/structure.rst +++ b/docs/reference/database/structure.rst @@ -197,6 +197,14 @@ Database structure revoked_at: timestamptz } + entity job_dependency { + * id: bigint + ---- + * from_dataset_id: bigint + * to_dataset_id: bigint + type: text null + } + address ||--o{ location dataset ||--o{ location @@ -232,9 +240,12 @@ Database structure column_lineage "fingerprint" ||--o{ dataset_column_relation tag_value ||--o{ tag - dataset_tagvalue ||--o{ tag_value - job_tagvalue ||--o{ tag_value + dataset_tag_value ||--o{ tag_value + job_tag_value ||--o{ tag_value personal_token ||--o{ user + job_dependency "from_job_id" ||--o{ job + job_dependency "to_job_id" ||--o{ job + @enduml diff --git a/tests/test_consumer/test_extractors/test_extractors_run_airflow.py b/tests/test_consumer/test_extractors/test_extractors_run_airflow.py index 168c1e54..0aaeed78 100644 --- a/tests/test_consumer/test_extractors/test_extractors_run_airflow.py +++ b/tests/test_consumer/test_extractors/test_extractors_run_airflow.py @@ -18,6 +18,7 @@ TagValueDTO, UserDTO, ) +from data_rentgen.dto.job_dependency import JobDependencyDTO from data_rentgen.openlineage.job import OpenLineageJob from data_rentgen.openlineage.job_facets import ( OpenLineageJobFacets, @@ -43,6 +44,12 @@ OpenLineageProcessingEngineRunFacet, OpenLineageRunFacets, ) +from data_rentgen.openlineage.run_facets.job_dependency import ( + OpenLineageJobDependenciesRunFacet, + OpenLineageJobDependency, + OpenLineageJobIdentifier, + OpenLineageRunIdentifier, +) from data_rentgen.openlineage.run_facets.run_tags import OpenLineageRunTagsFacet, OpenLineageRunTagsFacetField @@ -1485,3 +1492,320 @@ def test_extractors_extract_run_airflow_task_nominal_times( expected_start_at=expected_start_at, expected_end_at=expected_end_at, ) + + +def test_extractors_extract_run_airflow_task_job_dependencies_ol_provider_2_10_plus(): + # https://github.com/apache/airflow/pull/59521 + now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc) + run_id = UUID("01908223-0782-79b8-9495-b1c38aaee839") + run = OpenLineageRunEvent( + eventType=OpenLineageRunEventType.COMPLETE, + eventTime=now, + job=OpenLineageJob( + namespace="http://airflow-host:8081", + name="mydag.mytask", + facets=OpenLineageJobFacets( + jobType=OpenLineageJobTypeJobFacet( + processingType=OpenLineageJobProcessingType.BATCH, + integration="AIRFLOW", + jobType="TASK", + ), + ), + ), + run=OpenLineageRun( + runId=run_id, + facets=OpenLineageRunFacets( + processing_engine=OpenLineageProcessingEngineRunFacet( + version=Version("2.9.2"), + name="Airflow", + openlineageAdapterVersion=Version("2.3.0"), + ), + airflow=OpenLineageAirflowTaskRunFacet( + dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"), + dagRun=OpenLineageAirflowDagRunInfo( + run_id="scheduled__2024-07-05T09:04:13:979349+00:00", + run_type=OpenLineageAirflowDagRunType.SCHEDULED, + data_interval_start=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), + data_interval_end=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), + ), + task=OpenLineageAirflowTaskInfo( + task_id="mytask", + operator_class="BashOperator", + ), + taskInstance=OpenLineageAirflowTaskInstanceInfo( + try_number=1, + ), + ), + jobDependencies=OpenLineageJobDependenciesRunFacet( + upstream=[ + OpenLineageJobDependency( + job=OpenLineageJobIdentifier( + namespace="http://airflow-host:8081", + name="mydag.previous_task", + ), + run=OpenLineageRunIdentifier( + runId=UUID("01908223-0782-79b8-9495-b1c38aaee831"), + ), + dependency_type="IMPLICIT_DEPENDENCY", + sequence_trigger_rule="FINISH_TO_START", + status_trigger_rule="EXECUTE_EVERY_TIME", + ), + ], + downstream=[ + OpenLineageJobDependency( + job=OpenLineageJobIdentifier( + namespace="http://airflow-host:8081", + name="mydag.next_task", + ), + dependency_type="IMPLICIT_DEPENDENCY", + sequence_trigger_rule="FINISH_TO_START", + status_trigger_rule="EXECUTE_EVERY_TIME", + ), + ], + trigger_rule="ALL_SUCCESS", + ), + ), + ), + ) + + assert AirflowTaskExtractor().extract_run(run) == RunDTO( + id=run_id, + job=JobDTO( + name="mydag.mytask", + location=LocationDTO( + type="http", + name="airflow-host:8081", + addresses={"http://airflow-host:8081"}, + ), + type=JobTypeDTO(type="AIRFLOW_TASK"), + tag_values={ + TagValueDTO( + tag=TagDTO(name="airflow.version"), + value="2.9.2", + ), + TagValueDTO( + tag=TagDTO(name="openlineage_adapter.version"), + value="2.3.0", + ), + }, + ), + status=RunStatusDTO.SUCCEEDED, + started_at=None, + start_reason=RunStartReasonDTO.AUTOMATIC, + user=None, + ended_at=now, + external_id="scheduled__2024-07-05T09:04:13:979349+00:00", + attempt="1", + persistent_log_url=( + "http://airflow-host:8081/dags/mydag/grid?tab=logs&dag_run_id=scheduled__2024-07-05T09%3A04%3A13%3A979349%2B00%3A00&task_id=mytask&map_index=-1" + ), + running_log_url=None, + job_dependencies=[ + JobDependencyDTO( + from_job=JobDTO( + name="mydag.previous_task", + location=LocationDTO( + type="http", + name="airflow-host:8081", + addresses={"http://airflow-host:8081"}, + ), + ), + to_job=JobDTO( + name="mydag.mytask", + location=LocationDTO( + type="http", + name="airflow-host:8081", + addresses={"http://airflow-host:8081"}, + ), + type=JobTypeDTO(type="AIRFLOW_TASK"), + tag_values={ + TagValueDTO( + tag=TagDTO(name="airflow.version"), + value="2.9.2", + ), + TagValueDTO( + tag=TagDTO(name="openlineage_adapter.version"), + value="2.3.0", + ), + }, + ), + type="IMPLICIT_DEPENDENCY", + ), + JobDependencyDTO( + from_job=JobDTO( + name="mydag.mytask", + location=LocationDTO( + type="http", + name="airflow-host:8081", + addresses={"http://airflow-host:8081"}, + ), + type=JobTypeDTO(type="AIRFLOW_TASK"), + tag_values={ + TagValueDTO( + tag=TagDTO(name="airflow.version"), + value="2.9.2", + ), + TagValueDTO( + tag=TagDTO(name="openlineage_adapter.version"), + value="2.3.0", + ), + }, + ), + to_job=JobDTO( + name="mydag.next_task", + location=LocationDTO( + type="http", + name="airflow-host:8081", + addresses={"http://airflow-host:8081"}, + ), + ), + type="IMPLICIT_DEPENDENCY", + ), + ], + ) + + +def test_extractors_extract_run_airflow_task_job_dependencies_ol_provider_below_2_10(): + now = datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc) + run_id = UUID("01908223-0782-79b8-9495-b1c38aaee839") + run = OpenLineageRunEvent( + eventType=OpenLineageRunEventType.COMPLETE, + eventTime=now, + job=OpenLineageJob( + namespace="http://airflow-host:8081", + name="mydag.mytask", + facets=OpenLineageJobFacets( + jobType=OpenLineageJobTypeJobFacet( + processingType=OpenLineageJobProcessingType.BATCH, + integration="AIRFLOW", + jobType="TASK", + ), + ), + ), + run=OpenLineageRun( + runId=run_id, + facets=OpenLineageRunFacets( + processing_engine=OpenLineageProcessingEngineRunFacet( + version=Version("2.9.2"), + name="Airflow", + openlineageAdapterVersion=Version("2.3.0"), + ), + airflow=OpenLineageAirflowTaskRunFacet( + dag=OpenLineageAirflowDagInfo(dag_id="mydag", owner="airflow"), + dagRun=OpenLineageAirflowDagRunInfo( + run_id="scheduled__2024-07-05T09:04:13:979349+00:00", + run_type=OpenLineageAirflowDagRunType.SCHEDULED, + data_interval_start=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), + data_interval_end=datetime(2024, 7, 5, 9, 4, 13, 979349, tzinfo=timezone.utc), + ), + task=OpenLineageAirflowTaskInfo( + task_id="mytask", + operator_class="BashOperator", + downstream_task_ids=["next_task"], + upstream_task_ids=["previous_task"], + ), + taskInstance=OpenLineageAirflowTaskInstanceInfo( + try_number=1, + ), + ), + ), + ), + ) + + assert AirflowTaskExtractor().extract_run(run) == RunDTO( + id=run_id, + job=JobDTO( + name="mydag.mytask", + location=LocationDTO( + type="http", + name="airflow-host:8081", + addresses={"http://airflow-host:8081"}, + ), + type=JobTypeDTO(type="AIRFLOW_TASK"), + tag_values={ + TagValueDTO( + tag=TagDTO(name="airflow.version"), + value="2.9.2", + ), + TagValueDTO( + tag=TagDTO(name="openlineage_adapter.version"), + value="2.3.0", + ), + }, + ), + status=RunStatusDTO.SUCCEEDED, + started_at=None, + start_reason=RunStartReasonDTO.AUTOMATIC, + user=None, + ended_at=now, + external_id="scheduled__2024-07-05T09:04:13:979349+00:00", + attempt="1", + persistent_log_url=( + "http://airflow-host:8081/dags/mydag/grid?tab=logs&dag_run_id=scheduled__2024-07-05T09%3A04%3A13%3A979349%2B00%3A00&task_id=mytask&map_index=-1" + ), + running_log_url=None, + job_dependencies=[ + JobDependencyDTO( + from_job=JobDTO( + name="mydag.previous_task", + location=LocationDTO( + type="http", + name="airflow-host:8081", + addresses={"http://airflow-host:8081"}, + ), + type=JobTypeDTO(type="AIRFLOW_TASK"), + ), + to_job=JobDTO( + name="mydag.mytask", + location=LocationDTO( + type="http", + name="airflow-host:8081", + addresses={"http://airflow-host:8081"}, + ), + type=JobTypeDTO(type="AIRFLOW_TASK"), + tag_values={ + TagValueDTO( + tag=TagDTO(name="airflow.version"), + value="2.9.2", + ), + TagValueDTO( + tag=TagDTO(name="openlineage_adapter.version"), + value="2.3.0", + ), + }, + ), + type="DIRECT_DEPENDENCY", + ), + JobDependencyDTO( + from_job=JobDTO( + name="mydag.mytask", + location=LocationDTO( + type="http", + name="airflow-host:8081", + addresses={"http://airflow-host:8081"}, + ), + type=JobTypeDTO(type="AIRFLOW_TASK"), + tag_values={ + TagValueDTO( + tag=TagDTO(name="airflow.version"), + value="2.9.2", + ), + TagValueDTO( + tag=TagDTO(name="openlineage_adapter.version"), + value="2.3.0", + ), + }, + ), + to_job=JobDTO( + name="mydag.next_task", + location=LocationDTO( + type="http", + name="airflow-host:8081", + addresses={"http://airflow-host:8081"}, + ), + type=JobTypeDTO(type="AIRFLOW_TASK"), + ), + type="DIRECT_DEPENDENCY", + ), + ], + ) diff --git a/tests/test_consumer/test_handlers/test_runs_handler_airflow.py b/tests/test_consumer/test_handlers/test_runs_handler_airflow.py index 327cbf1b..54a58010 100644 --- a/tests/test_consumer/test_handlers/test_runs_handler_airflow.py +++ b/tests/test_consumer/test_handlers/test_runs_handler_airflow.py @@ -17,6 +17,7 @@ DatasetSymlink, Input, Job, + JobDependency, Location, Operation, OperationStatus, @@ -118,6 +119,21 @@ async def test_runs_handler_airflow( assert {tv.tag.name: tv.value for tv in insert_task_job.tag_values} == expected_tag_values assert insert_task_job.parent_job_id == dag_job.id + job_dependency_query = select(JobDependency).order_by(JobDependency.from_job_id) + job_dependency_scalars = await async_session.scalars(job_dependency_query) + job_dependencies = job_dependency_scalars.all() + assert len(job_dependencies) == 2 + + create_to_insert_job_dependency = job_dependencies[0] + assert create_to_insert_job_dependency.from_job_id == create_task_job.id + assert create_to_insert_job_dependency.to_job_id == insert_task_job.id + assert create_to_insert_job_dependency.type == "DIRECT_DEPENDENCY" + + drop_to_create_job_dependency = job_dependencies[1] + assert drop_to_create_job_dependency.from_job_id == drop_task_job.id + assert drop_to_create_job_dependency.to_job_id == create_task_job.id + assert drop_to_create_job_dependency.type == "DIRECT_DEPENDENCY" + run_query = select(Run).order_by(Run.id) run_scalars = await async_session.scalars(run_query) runs = run_scalars.all() diff --git a/tests/test_consumer/test_handlers/test_runs_handler_dbt.py b/tests/test_consumer/test_handlers/test_runs_handler_dbt.py index 965affed..13c06237 100644 --- a/tests/test_consumer/test_handlers/test_runs_handler_dbt.py +++ b/tests/test_consumer/test_handlers/test_runs_handler_dbt.py @@ -17,6 +17,7 @@ DatasetSymlink, Input, Job, + JobDependency, Location, Operation, OperationStatus, @@ -88,6 +89,11 @@ async def test_runs_handler_dbt( "openlineage_adapter.version": "1.34.0", } + job_dependency_query = select(JobDependency).order_by(JobDependency.id) + job_dependency_scalars = await async_session.scalars(job_dependency_query) + job_dependencies = job_dependency_scalars.all() + assert not job_dependencies + run_query = select(Run).order_by(Run.id).options(selectinload(Run.started_by_user)) run_scalars = await async_session.scalars(run_query) runs = run_scalars.all() @@ -106,7 +112,7 @@ async def test_runs_handler_dbt( assert run.running_log_url is None assert run.persistent_log_url is None - sql_query_query = select(SQLQuery).order_by(SQLQuery.id) + sql_query_query = select(SQLQuery).order_by(SQLQuery.fingerprint) sql_euery_scalars = await async_session.scalars(sql_query_query) sql_queries = sql_euery_scalars.all() assert len(sql_queries) == 1 diff --git a/tests/test_consumer/test_handlers/test_runs_handler_flink.py b/tests/test_consumer/test_handlers/test_runs_handler_flink.py index ba86492e..40c8c84f 100644 --- a/tests/test_consumer/test_handlers/test_runs_handler_flink.py +++ b/tests/test_consumer/test_handlers/test_runs_handler_flink.py @@ -16,6 +16,7 @@ DatasetSymlink, Input, Job, + JobDependency, Location, Operation, OperationStatus, @@ -85,6 +86,11 @@ async def test_runs_handler_flink( "openlineage_adapter.version": "1.34.0", } + job_dependency_query = select(JobDependency).order_by(JobDependency.id) + job_dependency_scalars = await async_session.scalars(job_dependency_query) + job_dependencies = job_dependency_scalars.all() + assert not job_dependencies + run_query = select(Run).order_by(Run.id).options(selectinload(Run.started_by_user)) run_scalars = await async_session.scalars(run_query) runs = run_scalars.all() diff --git a/tests/test_consumer/test_handlers/test_runs_handler_hive.py b/tests/test_consumer/test_handlers/test_runs_handler_hive.py index 1ec20a78..4e731127 100644 --- a/tests/test_consumer/test_handlers/test_runs_handler_hive.py +++ b/tests/test_consumer/test_handlers/test_runs_handler_hive.py @@ -18,6 +18,7 @@ DatasetSymlinkType, Input, Job, + JobDependency, Location, Operation, OperationStatus, @@ -88,6 +89,11 @@ async def test_runs_handler_hive( "openlineage_adapter.version": "1.35.0", } + job_dependency_query = select(JobDependency).order_by(JobDependency.id) + job_dependency_scalars = await async_session.scalars(job_dependency_query) + job_dependencies = job_dependency_scalars.all() + assert not job_dependencies + run_query = select(Run).order_by(Run.id).options(selectinload(Run.started_by_user)) run_scalars = await async_session.scalars(run_query) runs = run_scalars.all() @@ -107,7 +113,7 @@ async def test_runs_handler_hive( assert session_run.running_log_url is None assert session_run.persistent_log_url is None - sql_query = select(SQLQuery).order_by(SQLQuery.id) + sql_query = select(SQLQuery).order_by(SQLQuery.fingerprint) sql_query_scalars = await async_session.scalars(sql_query) sql_queries = sql_query_scalars.all() assert len(sql_queries) == 1 diff --git a/tests/test_consumer/test_handlers/test_runs_handler_spark.py b/tests/test_consumer/test_handlers/test_runs_handler_spark.py index 86eae396..6d19eea9 100644 --- a/tests/test_consumer/test_handlers/test_runs_handler_spark.py +++ b/tests/test_consumer/test_handlers/test_runs_handler_spark.py @@ -18,6 +18,7 @@ DatasetSymlinkType, Input, Job, + JobDependency, Location, Operation, OperationStatus, @@ -88,6 +89,11 @@ async def test_runs_handler_spark( "openlineage_adapter.version": "1.19.0", } + job_dependency_query = select(JobDependency).order_by(JobDependency.id) + job_dependency_scalars = await async_session.scalars(job_dependency_query) + job_dependencies = job_dependency_scalars.all() + assert not job_dependencies + run_query = select(Run).order_by(Run.id).options(selectinload(Run.started_by_user)) run_scalars = await async_session.scalars(run_query) runs = run_scalars.all() @@ -107,7 +113,7 @@ async def test_runs_handler_spark( assert application_run.running_log_url == "http://127.0.0.1:4040" assert application_run.persistent_log_url is None - sql_query = select(SQLQuery).order_by(SQLQuery.id) + sql_query = select(SQLQuery).order_by(SQLQuery.fingerprint) sql_query_scalars = await async_session.scalars(sql_query) sql_queries = sql_query_scalars.all() assert len(sql_queries) == 1 diff --git a/tests/test_consumer/test_handlers/test_runs_handler_unknown.py b/tests/test_consumer/test_handlers/test_runs_handler_unknown.py index 84adf474..3d2ba084 100644 --- a/tests/test_consumer/test_handlers/test_runs_handler_unknown.py +++ b/tests/test_consumer/test_handlers/test_runs_handler_unknown.py @@ -17,6 +17,7 @@ DatasetSymlink, Input, Job, + JobDependency, Location, Operation, OperationStatus, @@ -85,6 +86,11 @@ async def test_runs_handler_unknown( assert job.location.addresses[0].url == "unknown://unknown" assert {tv.tag.name: tv.value for tv in jobs[0].tag_values} == {} + job_dependency_query = select(JobDependency).order_by(JobDependency.id) + job_dependency_scalars = await async_session.scalars(job_dependency_query) + job_dependencies = job_dependency_scalars.all() + assert not job_dependencies + run_query = select(Run).order_by(Run.id).options(selectinload(Run.started_by_user)) run_scalars = await async_session.scalars(run_query) runs = run_scalars.all()