Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions data_rentgen/consumer/extractors/batch_extraction_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
DatasetDTO,
DatasetSymlinkDTO,
InputDTO,
JobDependencyDTO,
JobDTO,
JobTypeDTO,
LocationDTO,
Expand All @@ -29,6 +30,7 @@
DatasetDTO,
DatasetSymlinkDTO,
InputDTO,
JobDependencyDTO,
JobDTO,
JobTypeDTO,
LocationDTO,
Expand Down Expand Up @@ -67,6 +69,7 @@ def __init__(self):
self._dataset_symlinks: dict[tuple, DatasetSymlinkDTO] = {}
self._job_types: dict[tuple, JobTypeDTO] = {}
self._jobs: dict[tuple, JobDTO] = {}
self._job_dependencies: dict[tuple, JobDependencyDTO] = {}
self._runs: dict[tuple, RunDTO] = {}
self._operations: dict[tuple, OperationDTO] = {}
self._inputs: dict[tuple, InputDTO] = {}
Expand All @@ -86,6 +89,7 @@ def __repr__(self):
f"dataset_symlinks={len(self._dataset_symlinks)}, "
f"job_types={len(self._job_types)}, "
f"jobs={len(self._jobs)}, "
f"job_dependencies={len(self._job_dependencies)}, "
f"runs={len(self._runs)}, "
f"operations={len(self._operations)}, "
f"inputs={len(self._inputs)}, "
Expand Down Expand Up @@ -139,12 +143,19 @@ def add_job(self, job: JobDTO):
job.tag_values = {self.add_tag_value(tag_value) for tag_value in job.tag_values}
return self._add(self._jobs, job)

def add_job_dependency(self, job_dependency: JobDependencyDTO):
job_dependency.from_job = self.add_job(job_dependency.from_job)
job_dependency.to_job = self.add_job(job_dependency.to_job)
return self._add(self._job_dependencies, job_dependency)

def add_run(self, run: RunDTO):
run.job = self.add_job(run.job)
if run.parent_run:
run.parent_run = self.add_run(run.parent_run)
if run.user:
run.user = self.add_user(run.user)
for job_dependency in run.job_dependencies:
self.add_job_dependency(job_dependency)
return self._add(self._runs, run)

def add_operation(self, operation: OperationDTO):
Expand Down Expand Up @@ -232,6 +243,12 @@ def get_job(self, job_key: tuple) -> JobDTO:
job.tag_values = {self.get_tag_value(tag_value.unique_key) for tag_value in job.tag_values}
return job

def get_job_dependency(self, job_dependency_key: tuple) -> JobDependencyDTO:
job_dependency = self._job_dependencies[job_dependency_key]
job_dependency.from_job = self.get_job(job_dependency.from_job.unique_key)
job_dependency.to_job = self.get_job(job_dependency.to_job.unique_key)
return job_dependency

def get_run(self, run_key: tuple) -> RunDTO:
run = self._runs[run_key]
run.job = self.get_job(run.job.unique_key)
Expand Down Expand Up @@ -290,6 +307,9 @@ def job_types(self) -> list[JobTypeDTO]:
def jobs(self) -> list[JobDTO]:
return self._resolve(self.get_job, self._jobs)

def job_dependencies(self) -> list[JobDependencyDTO]:
return self._resolve(self.get_job_dependency, self._job_dependencies)

def runs(self) -> list[RunDTO]:
return self._resolve(self.get_run, self._runs)

Expand Down
11 changes: 9 additions & 2 deletions data_rentgen/consumer/extractors/generic/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
)
from data_rentgen.openlineage.job import OpenLineageJob
from data_rentgen.openlineage.run_facets import (
OpenLineageJobIdentifier,
OpenLineageParentJob,
)

Expand All @@ -29,7 +30,10 @@ def extract_job(self, job: OpenLineageJob) -> JobDTO:
)
return self._enrich_job_tags(job_dto, job)

def extract_parent_job(self, job: OpenLineageJob | OpenLineageParentJob) -> JobDTO:
def extract_pure_job(
self,
job: OpenLineageJob | OpenLineageParentJob | OpenLineageJobIdentifier,
) -> JobDTO:
"""
Extract JobDTO from parent job reference
"""
Expand All @@ -38,7 +42,10 @@ def extract_parent_job(self, job: OpenLineageJob | OpenLineageParentJob) -> JobD
location=self._extract_job_location(job),
)

def _extract_job_location(self, job: OpenLineageJob | OpenLineageParentJob) -> LocationDTO:
def _extract_job_location(
self,
job: OpenLineageJob | OpenLineageParentJob | OpenLineageJobIdentifier,
) -> LocationDTO:
# hostname and scheme are normalized to lowercase for uniqueness
url = urlparse(job.namespace.lower())
scheme = url.scheme or "unknown"
Expand Down
46 changes: 38 additions & 8 deletions data_rentgen/consumer/extractors/generic/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from abc import ABC, abstractmethod

from data_rentgen.dto import (
JobDependencyDTO,
JobDTO,
RunDTO,
RunStatusDTO,
Expand All @@ -17,10 +18,11 @@
OpenLineageRunEventType,
)
from data_rentgen.openlineage.run_facets import (
OpenLineageJobIdentifier,
OpenLineageParentJob,
OpenLineageParentRunFacet,
OpenLineageRunTagsFacetField,
)
from data_rentgen.openlineage.run_facets.run_tags import OpenLineageRunTagsFacetField


class RunExtractorMixin(ABC):
Expand All @@ -29,18 +31,14 @@ def extract_job(self, job: OpenLineageJob) -> JobDTO:
pass

@abstractmethod
def extract_parent_job(self, job: OpenLineageJob | OpenLineageParentJob) -> JobDTO:
def extract_pure_job(self, job: OpenLineageJob | OpenLineageParentJob | OpenLineageJobIdentifier) -> JobDTO:
pass

def extract_run(self, event: OpenLineageRunEvent) -> RunDTO:
"""
Extract RunDTO from specific event
"""
run = RunDTO(
id=event.run.runId, # type: ignore [arg-type]
job=self.extract_job(event.job),
parent_run=self.extract_parent_run(event.run.facets.parent) if event.run.facets.parent else None,
)
run = self.extract_pure_run(event)
if run.parent_run:
run.job.parent_job = run.parent_run.job
self._enrich_run_status(run, event)
Expand All @@ -49,15 +47,23 @@ def extract_run(self, event: OpenLineageRunEvent) -> RunDTO:
self._add_openlineage_client_version_tag(run, event)
self._enrich_run_tags(run, event)
self._enrich_nominal_times(run, event)
self._enrich_job_dependencies(run, event)
return run

def extract_pure_run(self, event: OpenLineageRunEvent) -> RunDTO:
return RunDTO(
id=event.run.runId, # type: ignore [arg-type]
job=self.extract_job(event.job),
parent_run=self.extract_parent_run(event.run.facets.parent) if event.run.facets.parent else None,
)

def extract_parent_run(self, facet: OpenLineageParentRunFacet | OpenLineageRunEvent) -> RunDTO:
"""
Extract RunDTO from parent run reference
"""
return RunDTO(
id=facet.run.runId,
job=self.extract_parent_job(facet.job),
job=self.extract_pure_job(facet.job),
)

def _enrich_run_status(self, run: RunDTO, event: OpenLineageRunEvent) -> RunDTO:
Expand Down Expand Up @@ -146,3 +152,27 @@ def _enrich_nominal_times(self, run: RunDTO, event: OpenLineageRunEvent) -> RunD
run.expected_end_at = None

return run

def _enrich_job_dependencies(self, run: RunDTO, event: OpenLineageRunEvent) -> RunDTO:
if not event.run.facets.jobDependencies:
return run

for upstream in event.run.facets.jobDependencies.upstream:
run.job_dependencies.append(
JobDependencyDTO(
from_job=self.extract_pure_job(upstream.job),
to_job=run.job,
type=upstream.dependency_type,
),
)

for downstream in event.run.facets.jobDependencies.downstream:
run.job_dependencies.append(
JobDependencyDTO(
from_job=run.job,
to_job=self.extract_pure_job(downstream.job),
type=downstream.dependency_type,
),
)

return run
36 changes: 35 additions & 1 deletion data_rentgen/consumer/extractors/impl/airflow_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@

from data_rentgen.consumer.extractors.generic import GenericExtractor
from data_rentgen.consumer.extractors.impl.utils import parse_kv_tag
from data_rentgen.dto import JobDTO, OperationDTO, RunDTO, RunStartReasonDTO, UserDTO
from data_rentgen.dto import JobDTO, JobTypeDTO, OperationDTO, RunDTO, RunStartReasonDTO, UserDTO
from data_rentgen.dto.job_dependency import JobDependencyDTO
from data_rentgen.openlineage.job import OpenLineageJob
from data_rentgen.openlineage.job_facets import OpenLineageJobTagsFacetField
from data_rentgen.openlineage.run_event import OpenLineageRunEvent
Expand Down Expand Up @@ -211,3 +212,36 @@ def _enrich_run_tags(self, run: RunDTO, event: OpenLineageRunEvent) -> RunDTO:
# facets are immutable
object.__setattr__(event.run.facets, "tags", OpenLineageRunTagsFacet(tags=run_tags))
return super()._enrich_run_tags(run, event)

def _enrich_job_dependencies(self, run: RunDTO, event: OpenLineageRunEvent) -> RunDTO:
run = super()._enrich_job_dependencies(run, event)
if event.run.facets.airflow:
# https://github.com/apache/airflow/pull/59521 brings up jobDependency facet,
# but it still doesn't contain direct task -> task dependencies
dag_info = event.run.facets.airflow.dag
task_info = event.run.facets.airflow.task
for upstream_task_id in task_info.upstream_task_ids:
run.job_dependencies.append(
JobDependencyDTO(
from_job=JobDTO(
name=f"{dag_info.dag_id}.{upstream_task_id}",
location=run.job.location,
type=JobTypeDTO("AIRFLOW_TASK"),
),
to_job=run.job,
type="DIRECT_DEPENDENCY",
),
)
for downstream_task_id in task_info.downstream_task_ids:
run.job_dependencies.append(
JobDependencyDTO(
from_job=run.job,
to_job=JobDTO(
name=f"{dag_info.dag_id}.{downstream_task_id}",
location=run.job.location,
type=JobTypeDTO("AIRFLOW_TASK"),
),
type="DIRECT_DEPENDENCY",
),
)
return run
12 changes: 4 additions & 8 deletions data_rentgen/consumer/extractors/impl/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def is_operation(self, event: OpenLineageRunEvent) -> bool:
# All events produced by Hive integration are queries == operations
return True

def extract_run(self, event: OpenLineageRunEvent) -> RunDTO:
def extract_pure_run(self, event: OpenLineageRunEvent) -> RunDTO:
# Hive produce only hive query events, but no events for hive session:
# https://github.com/OpenLineage/OpenLineage/issues/3784
# But we treat queries as operations, and operations should be bound to run (session) for grouping.
Expand All @@ -55,7 +55,7 @@ def extract_run(self, event: OpenLineageRunEvent) -> RunDTO:
if hive_session.username not in ("anonymous", "hive"):
user = UserDTO(name=hive_session.username)

run = RunDTO(
return RunDTO(
id=run_id,
job=JobDTO(
name=job_name,
Expand All @@ -68,12 +68,8 @@ def extract_run(self, event: OpenLineageRunEvent) -> RunDTO:
external_id=hive_session.sessionId,
user=user,
)
if run.parent_run:
run.job.parent_job = run.parent_run.job
self._add_engine_version_tag(run, event)
self._add_openlineage_adapter_version_tag(run, event)
self._add_openlineage_client_version_tag(run, event)
self._enrich_run_tags(run, event)

def _enrich_run_status(self, run: RunDTO, event: OpenLineageRunEvent):
return run

def extract_operation(self, event: OpenLineageRunEvent) -> OperationDTO:
Expand Down
10 changes: 10 additions & 0 deletions data_rentgen/consumer/saver.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ async def save(self, data: BatchExtractionResult):
await self.create_dataset_symlinks(data)
await self.create_job_types(data)
await self.create_jobs(data)
await self.create_job_dependencies(data)
await self.create_users(data)
await self.create_sql_queries(data)
await self.create_schemas(data)
Expand Down Expand Up @@ -113,6 +114,15 @@ async def create_jobs(self, data: BatchExtractionResult):
job = await self.unit_of_work.job.update(job, job_dto) # noqa: PLW2901
job_dto.id = job.id

async def create_job_dependencies(self, data: BatchExtractionResult):
self.logger.debug("Creating job dependencies")
job_dependency_pairs = await self.unit_of_work.job_dependency.fetch_bulk(data.job_dependencies())
for job_dependency_dto, job_dependency in job_dependency_pairs:
if not job_dependency:
async with self.unit_of_work:
job_dependency = await self.unit_of_work.job_dependency.create(job_dependency_dto) # noqa: PLW2901
job_dependency_dto.id = job_dependency.id

async def create_users(self, data: BatchExtractionResult):
self.logger.debug("Creating users")
user_pairs = await self.unit_of_work.user.fetch_bulk(data.users())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# SPDX-FileCopyrightText: 2024-present MTS PJSC
# SPDX-License-Identifier: Apache-2.0
"""Add job_dependency table

Revision ID: 4e119cb7481e
Revises: a1950f06a8cb
Create Date: 2026-03-06 17:39:31.296534

"""

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "4e119cb7481e"
down_revision = "a1950f06a8cb"
branch_labels = None
depends_on = None


def upgrade() -> None:
op.create_table(
"job_dependency",
sa.Column("id", sa.BigInteger(), nullable=False),
sa.Column("from_job_id", sa.BigInteger(), nullable=False),
sa.Column("to_job_id", sa.BigInteger(), nullable=False),
sa.Column("type", sa.String(), nullable=True),
sa.ForeignKeyConstraint(
["from_job_id"],
["job.id"],
name=op.f("fk__job_dependency__from_job_id__job"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["to_job_id"],
["job.id"],
name=op.f("fk__job_dependency__to_job_id__job"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk__job_dependency")),
sa.UniqueConstraint("from_job_id", "to_job_id", name=op.f("uq__job_dependency__from_job_id_to_job_id")),
)
op.create_index(op.f("ix__job_dependency__from_job_id"), "job_dependency", ["from_job_id"], unique=False)
op.create_index(op.f("ix__job_dependency__to_job_id"), "job_dependency", ["to_job_id"], unique=False)


def downgrade() -> None:
op.drop_index(op.f("ix__job_dependency__to_job_id"), table_name="job_dependency")
op.drop_index(op.f("ix__job_dependency__from_job_id"), table_name="job_dependency")
op.drop_table("job_dependency")
2 changes: 2 additions & 0 deletions data_rentgen/db/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from data_rentgen.db.models.dataset_symlink import DatasetSymlink, DatasetSymlinkType
from data_rentgen.db.models.input import Input
from data_rentgen.db.models.job import Job, JobTagValue
from data_rentgen.db.models.job_dependency import JobDependency
from data_rentgen.db.models.job_type import JobType
from data_rentgen.db.models.location import Location
from data_rentgen.db.models.operation import Operation, OperationStatus, OperationType
Expand All @@ -40,6 +41,7 @@
"DatasetTagValue",
"Input",
"Job",
"JobDependency",
"JobLastRun",
"JobTagValue",
"JobType",
Expand Down
Loading