29
29
from _pytask .node_protocols import PPathNode
30
30
from _pytask .node_protocols import PTask
31
31
from _pytask .node_protocols import PTaskWithPath
32
- from _pytask .path import find_common_ancestor_of_nodes
32
+ from _pytask .nodes import PythonNode
33
33
from _pytask .report import DagReport
34
34
from _pytask .shared import reduce_names_of_multiple_nodes
35
35
from _pytask .shared import reduce_node_name
@@ -87,6 +87,16 @@ def pytask_dag_create_dag(tasks: list[PTask]) -> nx.DiGraph:
87
87
tree_map (lambda x : dag .add_node (x .name , node = x ), task .produces )
88
88
tree_map (lambda x : dag .add_edge (task .name , x .name ), task .produces )
89
89
90
+ # If a node is a PythonNode wrapped in another PythonNode, it is a product from
91
+ # another task that is a dependency in the current task. Thus, draw an edge
92
+ # connecting the two nodes.
93
+ tree_map (
94
+ lambda x : dag .add_edge (x .value .name , x .name )
95
+ if isinstance (x , PythonNode ) and isinstance (x .value , PythonNode )
96
+ else None ,
97
+ task .depends_on ,
98
+ )
99
+
90
100
_check_if_dag_has_cycles (dag )
91
101
92
102
return dag
@@ -114,7 +124,7 @@ def pytask_dag_select_execution_dag(session: Session, dag: nx.DiGraph) -> None:
114
124
def pytask_dag_validate_dag (session : Session , dag : nx .DiGraph ) -> None :
115
125
"""Validate the DAG."""
116
126
_check_if_root_nodes_are_available (dag , session .config ["paths" ])
117
- _check_if_tasks_have_the_same_products (dag )
127
+ _check_if_tasks_have_the_same_products (dag , session . config [ "paths" ] )
118
128
119
129
120
130
def _have_task_or_neighbors_changed (
@@ -292,7 +302,7 @@ def _format_dictionary_to_tree(dict_: dict[str, list[str]], title: str) -> str:
292
302
return render_to_string (tree , console = console , strip_styles = True )
293
303
294
304
295
- def _check_if_tasks_have_the_same_products (dag : nx .DiGraph ) -> None :
305
+ def _check_if_tasks_have_the_same_products (dag : nx .DiGraph , paths : list [ Path ] ) -> None :
296
306
nodes_created_by_multiple_tasks = []
297
307
298
308
for node in dag .nodes :
@@ -303,19 +313,11 @@ def _check_if_tasks_have_the_same_products(dag: nx.DiGraph) -> None:
303
313
nodes_created_by_multiple_tasks .append (node )
304
314
305
315
if nodes_created_by_multiple_tasks :
306
- all_names = nodes_created_by_multiple_tasks + [
307
- predecessor
308
- for node in nodes_created_by_multiple_tasks
309
- for predecessor in dag .predecessors (node )
310
- ]
311
- common_ancestor = find_common_ancestor_of_nodes (* all_names )
312
316
dictionary = {}
313
317
for node in nodes_created_by_multiple_tasks :
314
- short_node_name = reduce_node_name (
315
- dag .nodes [node ]["node" ], [common_ancestor ]
316
- )
318
+ short_node_name = reduce_node_name (dag .nodes [node ]["node" ], paths )
317
319
short_predecessors = reduce_names_of_multiple_nodes (
318
- dag .predecessors (node ), dag , [ common_ancestor ]
320
+ dag .predecessors (node ), dag , paths
319
321
)
320
322
dictionary [short_node_name ] = short_predecessors
321
323
text = _format_dictionary_to_tree (dictionary , "Products from multiple tasks:" )
0 commit comments