Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ security-scan:
APPSECHUB_PARENT_PIPELINE_ID: $CI_PIPELINE_ID
APPSECHUB_SCA_SBOM_GENERATOR: custom
APPSECHUB_SBOM_PATH: sbom.cyclonedx.json
APPSECHUB_SBOM_MASK: "*bom*.json"
APPSECHUB_SBOM_MASK: '*bom*.json'
CUSTOM_SBOM_GENERATOR_JOB_NAME: sbom-creation
rules:
- if: $CI_COMMIT_REF_NAME == $CI_DEFAULT_BRANCH
Expand Down
23 changes: 14 additions & 9 deletions data_rentgen/server/services/lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,8 @@ async def get_lineage_by_runs( # noqa: C901, PLR0915, PLR0912
runs_by_id = {run.id: run for run in runs}
# Include child runs
if level == 0:
relations = await self._uow.run.list_descendant_relations(start_node_ids)
child_runs_ids = {c_id for _, c_id in relations}
run_relations = await self._uow.run.list_descendant_relations(start_node_ids)
child_runs_ids = {c_id for _, c_id in run_relations}
child_runs = await self._uow.run.list_by_ids(child_runs_ids)
runs.extend(child_runs)
child_runs_by_id = {run.id: run for run in child_runs}
Expand Down Expand Up @@ -413,7 +413,10 @@ async def get_lineage_by_runs( # noqa: C901, PLR0915, PLR0912
},
)
if level == 0:
result.run_ancestor_relations.update({tuple(r) for r in relations})
result.run_ancestor_relations.update({tuple(r) for r in run_relations})

job_relations = await self._uow.job.list_ancestor_relations(job_ids)
result.job_ancestor_relations.update({tuple(r) for r in job_relations})

upstream_dataset_ids = {input_.dataset_id for input_ in inputs} - ids_to_skip.datasets
downstream_dataset_ids = {output.dataset_id for output in outputs} - ids_to_skip.datasets
Expand Down Expand Up @@ -1334,26 +1337,28 @@ async def _populate_parents(
"""Returns a LineageServiceResult with only run_parent_relations or job_parent_relations populated."""
match granularity:
case "RUN" | "OPERATION":
relations = await self._uow.run.list_ancestor_relations(result.runs.keys())
parents_run_ids = {p_id for p_id, _ in relations}
run_relations = await self._uow.run.list_ancestor_relations(result.runs.keys())
job_relations = await self._uow.job.list_ancestor_relations(result.jobs.keys())
parents_run_ids = {p_id for p_id, _ in run_relations}
runs = await self._uow.run.list_by_ids(parents_run_ids)
runs_by_id = {run.id: run for run in runs}
job_ids = {run.job_id for run in runs}
jobs = await self._uow.job.list_by_ids(job_ids)
jobs_by_id = {job.id: job for job in jobs}
return LineageServiceResult(
run_ancestor_relations={tuple(r) for r in relations},
run_ancestor_relations={tuple(r) for r in run_relations},
job_ancestor_relations={tuple(r) for r in job_relations},
runs=runs_by_id,
jobs=jobs_by_id,
)

case "JOB":
relations = await self._uow.job.list_ancestor_relations(result.jobs.keys())
parents_job_ids = {p_id for p_id, _ in relations}
job_relations = await self._uow.job.list_ancestor_relations(result.jobs.keys())
parents_job_ids = {p_id for p_id, _ in job_relations}
jobs = await self._uow.job.list_by_ids(parents_job_ids)
jobs_by_id = {job.id: job for job in jobs}
return LineageServiceResult(
job_ancestor_relations={tuple(r) for r in relations},
job_ancestor_relations={tuple(r) for r in job_relations},
jobs=jobs_by_id,
)

Expand Down
4 changes: 2 additions & 2 deletions data_rentgen/server/utils/lineage_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ def build_lineage_response(lineage: LineageServiceResult) -> LineageResponseV1:
),
relations=LineageRelationsResponseV1(
parents=_get_run_parent_relations(lineage.runs) + _get_operation_parent_relations(lineage.operations),
ancestors=_get_runs_hierarchy_chain(lineage.run_ancestor_relations)
+ _get_jobs_hierarchy_chain(lineage.job_ancestor_relations),
ancestors=_get_jobs_hierarchy_chain(lineage.job_ancestor_relations)
+ _get_runs_hierarchy_chain(lineage.run_ancestor_relations),
symlinks=_get_symlink_relations(lineage.dataset_symlinks),
inputs=_get_input_relations(lineage.inputs),
outputs=_get_output_relations(lineage.outputs),
Expand Down
2 changes: 1 addition & 1 deletion tests/test_server/test_lineage/test_job_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -1190,7 +1190,7 @@ async def test_get_job_lineage_with_granularity_run_and_ancestor_relations(
assert response.json() == {
"relations": {
"parents": run_parents_to_json(runs),
"ancestors": runs_ancestors_to_json(runs),
"ancestors": jobs_ancestors_to_json(jobs) + runs_ancestors_to_json(runs),
"symlinks": [],
"inputs": [
*inputs_to_json(merge_io_by_jobs(lineage.inputs), granularity="JOB"),
Expand Down
3 changes: 2 additions & 1 deletion tests/test_server/test_lineage/test_operation_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from tests.test_server.utils.convert_to_json import (
datasets_to_json,
inputs_to_json,
jobs_ancestors_to_json,
jobs_to_json,
operation_parents_to_json,
operations_to_json,
Expand Down Expand Up @@ -961,7 +962,7 @@ async def test_get_operation_lineage_with_run_and_ancestor_relations(
assert response.json() == {
"relations": {
"parents": run_parents_to_json(runs) + operation_parents_to_json(lineage.operations),
"ancestors": runs_ancestors_to_json(runs),
"ancestors": jobs_ancestors_to_json(jobs) + runs_ancestors_to_json(runs),
"symlinks": [],
"inputs": [
*inputs_to_json(merge_io_by_jobs(lineage.inputs), granularity="JOB"),
Expand Down
7 changes: 4 additions & 3 deletions tests/test_server/test_lineage/test_run_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from tests.test_server.utils.convert_to_json import (
datasets_to_json,
inputs_to_json,
jobs_ancestors_to_json,
jobs_to_json,
operation_parents_to_json,
operations_to_json,
Expand Down Expand Up @@ -1264,7 +1265,7 @@ async def test_get_run_lineage_run_with_ancestor_relations(
assert response.json() == {
"relations": {
"parents": run_parents_to_json(runs),
"ancestors": runs_ancestors_to_json(runs),
"ancestors": jobs_ancestors_to_json(jobs) + runs_ancestors_to_json(runs),
"symlinks": [],
"inputs": [
*inputs_to_json(merge_io_by_jobs(lineage.inputs), granularity="JOB"),
Expand Down Expand Up @@ -1313,7 +1314,7 @@ async def test_runs_with_granularity_operation_and_ancestor_relations(
assert response.json() == {
"relations": {
"parents": run_parents_to_json(runs) + operation_parents_to_json(lineage.operations),
"ancestors": runs_ancestors_to_json(runs),
"ancestors": jobs_ancestors_to_json(jobs) + runs_ancestors_to_json(runs),
"symlinks": [],
"inputs": [
*inputs_to_json(merge_io_by_jobs(lineage.inputs), granularity="JOB"),
Expand Down Expand Up @@ -1360,7 +1361,7 @@ async def test_get_run_lineage_run_with_descendant_relations(
assert response.json() == {
"relations": {
"parents": run_parents_to_json(runs),
"ancestors": runs_ancestors_to_json(runs),
"ancestors": jobs_ancestors_to_json(jobs) + runs_ancestors_to_json(runs),
"symlinks": [],
"inputs": [
*inputs_to_json(merge_io_by_jobs(lineage.inputs), granularity="JOB"),
Expand Down