Skip to content

Commit

Permalink
Merge branch 'main' into release-1.8.2
Browse files Browse the repository at this point in the history
  • Loading branch information
tatiana authored Jan 15, 2025
2 parents 2d2c7f6 + 104f8d4 commit e60a956
Show file tree
Hide file tree
Showing 9 changed files with 137 additions and 13 deletions.
13 changes: 8 additions & 5 deletions cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,16 +410,19 @@ def _get_dbt_dag_task_group_identifier(dag: DAG, task_group: TaskGroup | None) -
return dag_task_group_identifier


def should_create_detached_nodes(test_behavior: TestBehavior) -> bool:
def should_create_detached_nodes(render_config: RenderConfig) -> bool:
"""
Decide if we should calculate / insert detached nodes into the graph.
"""
return test_behavior in (TestBehavior.BUILD, TestBehavior.AFTER_EACH)
return render_config.should_detach_multiple_parents_tests and render_config.test_behavior in (
TestBehavior.BUILD,
TestBehavior.AFTER_EACH,
)


def identify_detached_nodes(
nodes: dict[str, DbtNode],
test_behavior: TestBehavior,
render_config: RenderConfig,
detached_nodes: dict[str, DbtNode],
detached_from_parent: dict[str, list[DbtNode]],
) -> None:
Expand All @@ -430,7 +433,7 @@ def identify_detached_nodes(
Change in-place the dictionaries detached_nodes (detached node ID : node) and detached_from_parent (parent node ID that
is upstream to this test and the test node).
"""
if should_create_detached_nodes(test_behavior):
if should_create_detached_nodes(render_config):
for node_id, node in nodes.items():
if is_detached_test(node):
detached_nodes[node_id] = node
Expand Down Expand Up @@ -504,7 +507,7 @@ def build_airflow_graph(
# have multiple parents
detached_nodes: dict[str, DbtNode] = OrderedDict()
detached_from_parent: dict[str, list[DbtNode]] = defaultdict(list)
identify_detached_nodes(nodes, test_behavior, detached_nodes, detached_from_parent)
identify_detached_nodes(nodes, render_config, detached_nodes, detached_from_parent)

for node_id, node in nodes.items():
conversion_function = node_converters.get(node.resource_type, generate_task_or_group)
Expand Down
2 changes: 2 additions & 0 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class RenderConfig:
:param source_rendering_behavior: Determines how source nodes are rendered when using cosmos default source node rendering (ALL, NONE, WITH_TESTS_OR_FRESHNESS). Defaults to "NONE" (since Cosmos 1.6).
:param airflow_vars_to_purge_dbt_ls_cache: Specify Airflow variables that will affect the LoadMode.DBT_LS cache.
:param normalize_task_id: A callable that takes a dbt node as input and returns the task ID. This allows users to assign a custom node ID separate from the display name.
:param should_detach_multiple_parents_tests: A boolean that allows users to decide whether to run tests with multiple parent dependencies in separate tasks.
"""

emit_datasets: bool = True
Expand All @@ -83,6 +84,7 @@ class RenderConfig:
source_rendering_behavior: SourceRenderingBehavior = SourceRenderingBehavior.NONE
airflow_vars_to_purge_dbt_ls_cache: list[str] = field(default_factory=list)
normalize_task_id: Callable[..., Any] | None = None
should_detach_multiple_parents_tests: bool = False

def __post_init__(self, dbt_project_path: str | Path | None) -> None:
if self.env_vars:
Expand Down
3 changes: 2 additions & 1 deletion dev/dags/example_tests_multiple_parents.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from datetime import datetime
from pathlib import Path

from cosmos import DbtDag, ProfileConfig, ProjectConfig
from cosmos import DbtDag, ProfileConfig, ProjectConfig, RenderConfig
from cosmos.profiles import PostgresUserPasswordProfileMapping

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
Expand All @@ -31,4 +31,5 @@
# normal dag parameters
start_date=datetime(2023, 1, 1),
dag_id="example_multiple_parents_test",
render_config=RenderConfig(should_detach_multiple_parents_tests=True),
)
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 2 additions & 0 deletions docs/configuration/render-config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ The ``RenderConfig`` class takes the following arguments:
- ``airflow_vars_to_purge_cache``: (new in v1.5) Specify Airflow variables that will affect the ``LoadMode.DBT_LS`` cache. See `Caching <./caching.html>`_ for more information.
- ``source_rendering_behavior``: Determines how source nodes are rendered when using cosmos default source node rendering (ALL, NONE, WITH_TESTS_OR_FRESHNESS). Defaults to "NONE" (since Cosmos 1.6). See `Source Nodes Rendering <./source-nodes-rendering.html>`_ for more information.
- ``normalize_task_id``: A callable that takes a dbt node as input and returns the task ID. This function allows users to set a custom task_id independently of the model name, which can be specified as the task’s display_name. This way, task_id can be modified using a user-defined function, while the model name remains as the task’s display name. The display_name parameter is available in Airflow 2.9 and above. See `Task display name <./task-display-name.html>`_ for more information.
- ``load_method``: how to load your dbt project. See `Parsing Methods <parsing-methods.html>`_ for more information.
- ``should_detach_multiple_parents_tests``: A boolean to control if tests that depend on multiple parents should be run as standalone tasks. See `Parsing Methods <testing-behavior.html>`_ for more information.

Customizing how nodes are rendered (experimental)
-------------------------------------------------
Expand Down
74 changes: 72 additions & 2 deletions docs/configuration/testing-behavior.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ default behavior, which runs all models and tests, and then reports all failures
Cosmos supports the following test behaviors:

- ``after_each`` (default): turns each model into a task group with two steps: run the model, and run the tests
- ``build``: run dbt resources using the ``dbt build`` command, using a single task. This applies to dbt models, seeds and snapshots.
- ``build``: (since Cosmos 1.8) run dbt resources using the ``dbt build`` command, using a single task. This applies to dbt models, seeds and snapshots.
- ``after_all``: each model becomes a single task, and the tests only run if all models are run successfully
- ``none``: don't include tests

Expand All @@ -40,7 +40,7 @@ Example when changing the behavior to use ``TestBehavior.AFTER_ALL``:
.. image:: ../_static/test_behavior_after_all.png


Finally, an example DAG and how it is rendered in the Airflow UI when using ``TestBehavior.BUILD``:
Finally, an example DAG and how it is rendered in the Airflow UI when using ``TestBehavior.BUILD`` (available since Cosmos 1.8):

.. literalinclude:: ../../dev/dags/example_cosmos_dbt_build.py
:language: python
Expand Down Expand Up @@ -111,3 +111,73 @@ When at least one WARN message is present, the function passed to ``on_warning_c
If warnings that are not associated with tests occur (e.g. freshness warnings), they will still trigger the
``on_warning_callback`` method above. However, these warnings will not be included in the ``test_names`` and
``test_results`` context variables, which are specific to test-related warnings.


Tests with Multiple Parents
---------------------------

It is common for dbt projects to define tests that rely on multiple upstream models, snapshots or seeds.
By default, Cosmos will attempt to run these tests using the behavior defined using ``test_behavior`` as previously explained.

As an example, if there is a test that depends on multiple models (``model_a`` and ``combined_model``), and the DAG uses
``TestBehavior.AFTER_EACH``, Cosmos will attempt to run this test twice after each model run.

While the standard behavior of Cosmos works for many cases, there are a few scenarios when the test fails unless both models
run. To overcome this issue, starting in Cosmos 1.8.2, we introduced the parameter
``should_detach_multiple_parents_tests`` in ``RenderConfig``. By default, it is ``False``. If it is set to ``True`` and
``TestBehavior` is ``AFTER_EACH`` or ``BUILD``, Cosmos will identify all the test nodes that depend on multiple parents
and will create a standalone test task for each of them.

Cosmos will attempt to name this task after the test's original name. However, since some test names can exceed 250 characters and Airflow does not support IDs longer than this limit, Cosmos will assign names like “detached_0_test,” incrementing the number as needed.

The DAG `example_tests_multiple_parents <https://github.com/astronomer/astronomer-cosmos/blob/main/dev/dags/example_tests_multiple_parents.py>`_ illustrates this behavior.
It renders a dbt project named `multiple_parents_test <https://github.com/astronomer/astronomer-cosmos/tree/main/dev/dags/dbt/multiple_parents_test>`_ that has a test called `custom_test_combined_model <https://github.com/astronomer/astronomer-cosmos/blob/main/dev/dags/dbt/multiple_parents_test/macros/custom_test_combined_model.sql>`_ that depends on two models:

- **combined_model**
- **model_a**

By default, Cosmos will error:

.. image:: ../_static/test_with_multiple_parents_failure.png

.. code-block::
[2024-12-27T12:07:33.564+0000] {taskinstance.py:2905} ERROR - Task failed with exception
Traceback (most recent call last):
File "/Users/tati/Code/cosmos-clean/astronomer-cosmos/venvpy39/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 465, in _execute_task
result = _execute_callable(context=context, **execute_callable_kwargs)
File "/Users/tati/Code/cosmos-clean/astronomer-cosmos/venvpy39/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 432, in _execute_callable
return execute_callable(context=context, **execute_callable_kwargs)
File "/Users/tati/Code/cosmos-clean/astronomer-cosmos/venvpy39/lib/python3.9/site-packages/airflow/models/baseoperator.py", line 401, in wrapper
return func(self, *args, **kwargs)
File "/Users/tati/Code/cosmos-clean/astronomer-cosmos/cosmos/operators/local.py", line 796, in execute
result = self.build_and_run_cmd(context=context, cmd_flags=self.add_cmd_flags())
File "/Users/tati/Code/cosmos-clean/astronomer-cosmos/cosmos/operators/local.py", line 654, in build_and_run_cmd
result = self.run_command(cmd=dbt_cmd, env=env, context=context)
File "/Users/tati/Code/cosmos-clean/astronomer-cosmos/cosmos/operators/local.py", line 509, in run_command
self.handle_exception(result)
File "/Users/tati/Code/cosmos-clean/astronomer-cosmos/cosmos/operators/local.py", line 237, in handle_exception_dbt_runner
raise AirflowException(f"dbt invocation completed with errors: {error_message}")
airflow.exceptions.AirflowException: dbt invocation completed with errors: custom_test_combined_model_combined_model_: Database Error in test custom_test_combined_model_combined_model_ (models/schema.yml)
relation "public.combined_model" does not exist
LINE 12: SELECT id FROM "postgres"."public"."combined_model"
^
compiled Code at target/run/my_dbt_project/models/schema.yml/custom_test_combined_model_combined_model_.sql
However, if users set ``should_detach_multiple_parents_tests=True``, the test will be detached, as illustrated below.
The test will only run once after both models run, leading the DAG to succeed:

.. code-block:: python
from cosmos import DbtDag, RenderConfig
example_multiple_parents_test = DbtDag(
...,
render_config=RenderConfig(
should_detach_multiple_parents_tests=True,
),
)
.. image:: ../_static/test_with_multiple_parents_success.png
1 change: 0 additions & 1 deletion docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@

|fury| |ossrank| |downloads| |pre-commit|


Welcome to Astronomer Cosmos! Whether you're an experienced data practitioner or just getting started, Cosmos makes it
simple to manage and orchestrate your dbt workflows using `Apache Airflow® <https://airflow.apache.org/>`_, saving you
time and effort. By automatically turning dbt workflows into Airflow DAGs, Cosmos allows you to focus on building
Expand Down
55 changes: 51 additions & 4 deletions tests/test_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ def test_converter_creates_dag_with_test_with_multiple_parents():
"""
project_config = ProjectConfig(dbt_project_path=MULTIPLE_PARENTS_TEST_DBT_PROJECT)
execution_config = ExecutionConfig(execution_mode=ExecutionMode.LOCAL)
render_config = RenderConfig(should_detach_multiple_parents_tests=True)
profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
Expand All @@ -183,7 +184,11 @@ def test_converter_creates_dag_with_test_with_multiple_parents():
)
with DAG("sample_dag", start_date=datetime(2024, 4, 16)) as dag:
converter = DbtToAirflowConverter(
dag=dag, project_config=project_config, profile_config=profile_config, execution_config=execution_config
dag=dag,
project_config=project_config,
profile_config=profile_config,
execution_config=execution_config,
render_config=render_config,
)
tasks = converter.tasks_map

Expand All @@ -209,14 +214,55 @@ def test_converter_creates_dag_with_test_with_multiple_parents():
)


@pytest.mark.integration
def test_converter_creates_dag_with_test_with_multiple_parents_with_should_detach_multiple_parents_tests_false():
"""
Validate topology of a project that uses the MULTIPLE_PARENTS_TEST_DBT_PROJECT project
"""
project_config = ProjectConfig(dbt_project_path=MULTIPLE_PARENTS_TEST_DBT_PROJECT)
execution_config = ExecutionConfig(execution_mode=ExecutionMode.LOCAL)
render_config = RenderConfig(should_detach_multiple_parents_tests=False)
profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id="example_conn",
profile_args={"schema": "public"},
disable_event_tracking=True,
),
)
with DAG("sample_dag", start_date=datetime(2024, 4, 16)) as dag:
converter = DbtToAirflowConverter(
dag=dag,
project_config=project_config,
profile_config=profile_config,
execution_config=execution_config,
render_config=render_config,
)
tasks = converter.tasks_map

assert len(converter.tasks_map) == 3

# We exclude the test that depends on combined_model and model_a from their commands
args = tasks["model.my_dbt_project.combined_model"].children["combined_model.test"].build_cmd({})[0]
assert args[1:] == ["test", "--models", "combined_model"]

args = tasks["model.my_dbt_project.model_a"].children["model_a.test"].build_cmd({})[0]
assert args[1:] == ["test", "--models", "model_a"]

# The test for model_b should not be changed, since it is not a parent of this test
args = tasks["model.my_dbt_project.model_b"].children["model_b.test"].build_cmd({})[0]
assert args[1:] == ["test", "--models", "model_b"]


@pytest.mark.integration
def test_converter_creates_dag_with_test_with_multiple_parents_test_afterall():
"""
Validate topology of a project that uses the MULTIPLE_PARENTS_TEST_DBT_PROJECT project
"""
project_config = ProjectConfig(dbt_project_path=MULTIPLE_PARENTS_TEST_DBT_PROJECT)
execution_config = ExecutionConfig(execution_mode=ExecutionMode.LOCAL)
render_config = RenderConfig(test_behavior=TestBehavior.AFTER_ALL)
render_config = RenderConfig(test_behavior=TestBehavior.AFTER_ALL, should_detach_multiple_parents_tests=True)
profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
Expand Down Expand Up @@ -255,7 +301,7 @@ def test_converter_creates_dag_with_test_with_multiple_parents_test_none():
"""
project_config = ProjectConfig(dbt_project_path=MULTIPLE_PARENTS_TEST_DBT_PROJECT)
execution_config = ExecutionConfig(execution_mode=ExecutionMode.LOCAL)
render_config = RenderConfig(test_behavior=TestBehavior.NONE)
render_config = RenderConfig(test_behavior=TestBehavior.NONE, should_detach_multiple_parents_tests=True)
profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
Expand Down Expand Up @@ -292,6 +338,7 @@ def test_converter_creates_dag_with_test_with_multiple_parents_and_build():
"""
project_config = ProjectConfig(dbt_project_path=MULTIPLE_PARENTS_TEST_DBT_PROJECT)
execution_config = ExecutionConfig(execution_mode=ExecutionMode.LOCAL)
render_config = RenderConfig(test_behavior=TestBehavior.BUILD, should_detach_multiple_parents_tests=True)
profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
Expand All @@ -307,7 +354,7 @@ def test_converter_creates_dag_with_test_with_multiple_parents_and_build():
project_config=project_config,
profile_config=profile_config,
execution_config=execution_config,
render_config=RenderConfig(test_behavior=TestBehavior.BUILD),
render_config=render_config,
)
tasks = converter.tasks_map

Expand Down

0 comments on commit e60a956

Please sign in to comment.