Skip to content

Commit 37429c4

Browse files
authored
feat: add Foundry pipeline
* feat: initial work to add Foundry pipeline for running DVE * feat: amend foundry pipeline to include exception handling as not using steps. Ensure that file transformation errors are being persisted * feat: some formatting. Tweaked how errors are handled within file transformation * test: reenabled books behave tests * docs: added some rationale around foundry pipeline
1 parent 7a774f6 commit 37429c4

File tree

11 files changed

+320
-22
lines changed

11 files changed

+320
-22
lines changed

src/dve/core_engine/exceptions.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ def __init__(
1414
self, error_message: str, *args: object, messages: Messages, entities: SparkEntities
1515
) -> None:
1616
super().__init__(error_message, *args)
17-
self.error_messsage = error_message
17+
self.error_message = error_message
1818
"""The error message explaining the critical processing error."""
1919
self.messages = messages
2020
"""The messages gathered at the time the error was emitted."""
@@ -26,6 +26,17 @@ def critical_messages(self) -> Iterator[FeedbackMessage]:
2626
"""Critical messages which caused the processing error."""
2727
yield from filter(lambda message: message.is_critical, self.messages)
2828

29+
def to_feedback_message(self) -> FeedbackMessage:
30+
"Convert to feedback message to write to json file"
31+
return FeedbackMessage(
32+
entity=None,
33+
record=None,
34+
failure_type="integrity",
35+
error_type="processing",
36+
error_location="Whole File",
37+
error_message=self.error_message,
38+
)
39+
2940

3041
class EntityTypeMismatch(TypeError):
3142
"""An exception emitted if entity type outputs from two collaborative objects are different."""

src/dve/pipeline/duckdb_pipeline.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,23 +24,23 @@ class DDBDVEPipeline(BaseDVEPipeline):
2424
def __init__(
2525
self,
2626
audit_tables: DDBAuditingManager,
27-
job_run_id: int,
2827
connection: DuckDBPyConnection,
2928
rules_path: Optional[URI],
3029
processed_files_path: Optional[URI],
3130
submitted_files_path: Optional[URI],
3231
reference_data_loader: Optional[type[BaseRefDataLoader]] = None,
32+
job_run_id: Optional[int] = None,
3333
):
3434
self._connection = connection
3535
super().__init__(
3636
audit_tables,
37-
job_run_id,
3837
DuckDBDataContract(connection=self._connection),
3938
DuckDBStepImplementations.register_udfs(connection=self._connection),
4039
rules_path,
4140
processed_files_path,
4241
submitted_files_path,
4342
reference_data_loader,
43+
job_run_id,
4444
)
4545

4646
# pylint: disable=arguments-differ
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
"""A duckdb pipeline for running on Foundry platform"""
2+
3+
from typing import Optional
4+
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import duckdb_get_entity_count, duckdb_write_parquet
5+
from dve.core_engine.backends.utilities import dump_errors
6+
from dve.core_engine.models import SubmissionInfo
7+
from dve.core_engine.type_hints import URI, Failed
8+
from dve.pipeline.duckdb_pipeline import DDBDVEPipeline
9+
from dve.pipeline.utils import SubmissionStatus
10+
from dve.parser import file_handling as fh
11+
12+
class FoundryDDBPipeline(DDBDVEPipeline):
13+
"""DuckDB pipeline for running on Foundry Platform.
14+
Polymorphed to allow for exception handling when processing
15+
single files sequentially through services."""
16+
17+
def persist_audit_records(self, submission_info: SubmissionInfo) -> URI:
18+
"""Write out key audit relations to parquet for persisting to datasets"""
19+
write_to = fh.joinuri(self.processed_files_path, submission_info.submission_id, "audit/")
20+
self.write_parquet(
21+
self._audit_tables._processing_status.get_relation(),
22+
write_to + "processing_status.parquet",
23+
)
24+
self.write_parquet(
25+
self._audit_tables._submission_statistics.get_relation(),
26+
write_to + "submission_statistics.parquet",
27+
)
28+
return write_to
29+
30+
def file_transformation(
31+
self, submission_info: SubmissionInfo
32+
) -> SubmissionInfo | dict[str, str]:
33+
try:
34+
return super().file_transformation(submission_info)
35+
except Exception as exc: # pylint: disable=W0718
36+
self._logger.error(f"File transformation raised exception: {exc}")
37+
self._logger.exception(exc)
38+
return submission_info.dict()
39+
40+
def apply_data_contract(self, submission_info: SubmissionInfo) -> tuple[SubmissionInfo | bool]:
41+
try:
42+
return super().apply_data_contract(submission_info)
43+
except Exception as exc: # pylint: disable=W0718
44+
self._logger.error(f"Apply data contract raised exception: {exc}")
45+
self._logger.exception(exc)
46+
return submission_info, True
47+
48+
def apply_business_rules(self, submission_info: SubmissionInfo, failed: Failed):
49+
try:
50+
return super().apply_business_rules(submission_info, failed)
51+
except Exception as exc: # pylint: disable=W0718
52+
self._logger.error(f"Apply business rules raised exception: {exc}")
53+
self._logger.exception(exc)
54+
return submission_info, SubmissionStatus(failed=True)
55+
56+
def run_pipeline(self, submission_info: SubmissionInfo) -> tuple[Optional[URI], URI, URI]:
57+
"""Sequential single submission pipeline runner"""
58+
try:
59+
sub_id: str = submission_info.submission_id
60+
self._audit_tables.add_new_submissions(submissions=[submission_info])
61+
self._audit_tables.mark_transform(submission_ids=[sub_id])
62+
sub_info = self.file_transformation(submission_info=submission_info)
63+
if isinstance(sub_info, SubmissionInfo):
64+
self._audit_tables.mark_data_contract(submission_ids=[sub_id])
65+
sub_info, failed = self.apply_data_contract(submission_info=submission_info)
66+
self._audit_tables.mark_business_rules(submissions=[(sub_id, failed)])
67+
sub_info, sub_status = self.apply_business_rules(
68+
submission_info=submission_info, failed=failed
69+
)
70+
else:
71+
sub_status = SubmissionStatus(failed=True)
72+
self._audit_tables.mark_error_report(
73+
submissions=[(sub_id, sub_status.submission_result)]
74+
)
75+
sub_info, sub_status, sub_stats, report_uri = self.error_report(
76+
submission_info=submission_info, status=sub_status
77+
)
78+
self._audit_tables.add_submission_statistics_records(sub_stats=[sub_stats])
79+
except Exception as err: # pylint: disable=W0718
80+
self._logger.error(
81+
f"During processing of submission_id: {sub_id}, the following exception was raised: {err}"
82+
)
83+
self._audit_tables.mark_failed(submissions=[sub_id])
84+
finally:
85+
audit_files_uri = self.persist_audit_records(submission_info=submission_info)
86+
return (
87+
(
88+
None
89+
if sub_status.failed
90+
else fh.joinuri(self.processed_files_path, sub_id, "business_rules")
91+
),
92+
report_uri,
93+
audit_files_uri,
94+
)

src/dve/pipeline/pipeline.py

Lines changed: 36 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,19 @@
1313
import polars as pl
1414
from pydantic import validate_arguments
1515

16+
from dve.core_engine.exceptions import CriticalProcessingError
17+
from dve.core_engine.message import FeedbackMessage
1618
import dve.reporting.excel_report as er
1719
from dve.core_engine.backends.base.auditing import BaseAuditingManager
1820
from dve.core_engine.backends.base.contract import BaseDataContract
1921
from dve.core_engine.backends.base.core import EntityManager
2022
from dve.core_engine.backends.base.reference_data import BaseRefDataLoader
2123
from dve.core_engine.backends.base.rules import BaseStepImplementations
22-
from dve.core_engine.backends.exceptions import MessageBearingError
24+
from dve.core_engine.backends.exceptions import (
25+
BackendError,
26+
MessageBearingError,
27+
ReaderLacksEntityTypeSupport,
28+
)
2329
from dve.core_engine.backends.readers import BaseFileReader
2430
from dve.core_engine.backends.types import EntityType
2531
from dve.core_engine.backends.utilities import dump_errors, stringify_model
@@ -44,13 +50,13 @@ class BaseDVEPipeline:
4450
def __init__(
4551
self,
4652
audit_tables: BaseAuditingManager,
47-
job_run_id: int,
4853
data_contract: BaseDataContract,
4954
step_implementations: Optional[BaseStepImplementations[EntityType]],
5055
rules_path: Optional[URI],
5156
processed_files_path: Optional[URI],
5257
submitted_files_path: Optional[URI],
5358
reference_data_loader: Optional[type[BaseRefDataLoader]] = None,
59+
job_run_id: Optional[int] = None,
5460
):
5561
self._submitted_files_path = submitted_files_path
5662
self._processed_files_path = processed_files_path
@@ -265,30 +271,41 @@ def file_transformation(
265271
if not self.processed_files_path:
266272
raise AttributeError("processed files path not provided")
267273

274+
errors: list[FeedbackMessage] = []
268275
submission_file_uri: URI = fh.joinuri(
269276
self.processed_files_path,
270277
submission_info.submission_id,
271278
submission_info.file_name_with_ext,
272279
)
273280
try:
274-
errors = self.write_file_to_parquet(
281+
errors.extend(self.write_file_to_parquet(
275282
submission_file_uri, submission_info, self.processed_files_path
276-
)
277-
if errors:
278-
dump_errors(
279-
fh.joinuri(self.processed_files_path, submission_info.submission_id),
280-
"file_transformation",
281-
errors,
282-
)
283-
return submission_info.dict()
284-
return submission_info
285-
except ValueError as exc:
286-
self._logger.error(f"File transformation write_file_to_parquet raised error: {exc}")
287-
return submission_info.dict()
288-
except Exception as exc: # pylint: disable=broad-except
283+
))
284+
285+
except MessageBearingError as exc:
286+
self._logger.error(f"Unexpected file transformation error: {exc}")
287+
self._logger.exception(exc)
288+
errors.extend(exc.messages)
289+
290+
except BackendError as exc: # pylint: disable=broad-except
289291
self._logger.error(f"Unexpected file transformation error: {exc}")
290292
self._logger.exception(exc)
293+
errors.extend([
294+
CriticalProcessingError(
295+
entities=None,
296+
error_message=repr(exc),
297+
messages=[],
298+
).to_feedback_message()
299+
])
300+
301+
if errors:
302+
dump_errors(
303+
fh.joinuri(self.processed_files_path, submission_info.submission_id),
304+
"file_transformation",
305+
errors,
306+
)
291307
return submission_info.dict()
308+
return submission_info
292309

293310
def file_transformation_step(
294311
self, pool: Executor, submissions_to_process: list[SubmissionInfo]
@@ -321,6 +338,7 @@ def file_transformation_step(
321338
except Exception as exc: # pylint: disable=W0703
322339
self._logger.error(f"File transformation raised exception: {exc}")
323340
self._logger.exception(exc)
341+
# TODO: write errors to file here (maybe processing errors - not to be seen by end user)
324342
failed_processing.append(sub_info)
325343
continue
326344

@@ -423,6 +441,7 @@ def data_contract_step(
423441
except Exception as exc: # pylint: disable=W0703
424442
self._logger.error(f"Data Contract raised exception: {exc}")
425443
self._logger.exception(exc)
444+
# TODO: write errors to file here (maybe processing errors - not to be seen by end user)
426445
failed_processing.append(sub_info)
427446
continue
428447

@@ -562,6 +581,7 @@ def business_rule_step(
562581
except Exception as exc: # pylint: disable=W0703
563582
self._logger.error(f"Business Rules raised exception: {exc}")
564583
self._logger.exception(exc)
584+
# TODO: write errors to file here (maybe processing errors - not to be seen by end user)
565585
failed_processing.append(sub_info)
566586
continue
567587

src/dve/pipeline/spark_pipeline.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,23 +26,23 @@ class SparkDVEPipeline(BaseDVEPipeline):
2626
def __init__(
2727
self,
2828
audit_tables: SparkAuditingManager,
29-
job_run_id: int,
3029
rules_path: Optional[URI],
3130
processed_files_path: Optional[URI],
3231
submitted_files_path: Optional[URI],
3332
reference_data_loader: Optional[type[BaseRefDataLoader]] = None,
3433
spark: Optional[SparkSession] = None,
34+
job_run_id: Optional[int] = None,
3535
):
3636
self._spark = spark if spark else SparkSession.builder.getOrCreate()
3737
super().__init__(
3838
audit_tables,
39-
job_run_id,
4039
SparkDataContract(spark_session=self._spark),
4140
SparkStepImplementations.register_udfs(self._spark),
4241
rules_path,
4342
processed_files_path,
4443
submitted_files_path,
4544
reference_data_loader,
45+
job_run_id,
4646
)
4747

4848
# pylint: disable=arguments-differ

tests/fixtures.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,4 +120,5 @@ def temp_ddb_conn() -> Iterator[Tuple[Path, DuckDBPyConnection]]:
120120
with tempfile.TemporaryDirectory(prefix="ddb_audit_testing") as tmp:
121121
db_file = Path(tmp, db + ".duckdb")
122122
conn = connect(database=db_file, read_only=False)
123+
123124
yield db_file, conn

tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_rules.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,7 @@ def test_one_to_one_join_multi_matches_raises(
457457
new_columns={"satellites.name": "satellite"},
458458
)
459459
entities = EntityManager({"planets": planets_rel, "satellites": satellites_rel})
460-
with pytest.raises(ValueError, match="Multiple matches for some records.+"):
460+
with pytest.raises(ValueError, match="Multiple matches for some records.*"):
461461
DUCKDB_STEP_BACKEND.one_to_one_join(entities, config=join)
462462

463463

tests/test_pipeline/pipeline_helpers.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,13 @@ def planet_test_files() -> Iterator[str]:
6666
shutil.copytree(get_test_file_path("planets/"), Path(tdir, "planets"))
6767
yield tdir + "/planets"
6868

69+
@pytest.fixture(scope="function")
70+
def movies_test_files() -> Iterator[str]:
71+
clear_config_cache()
72+
with tempfile.TemporaryDirectory() as tdir:
73+
shutil.copytree(get_test_file_path("movies/"), Path(tdir, "movies"))
74+
yield tdir + "/movies"
75+
6976

7077
@pytest.fixture(scope="function")
7178
def planet_data_after_file_transformation() -> Iterator[Tuple[SubmissionInfo, str]]:

tests/test_pipeline/test_duckdb_pipeline.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
from concurrent.futures import ThreadPoolExecutor
66
from pathlib import Path
7+
import shutil
78
from typing import Dict, Tuple
89
from uuid import uuid4
910

0 commit comments

Comments
 (0)