@@ -76,16 +76,30 @@ def pytask_dag(session: Session) -> bool | None:
76
76
@hookimpl
77
77
def pytask_dag_create_dag (tasks : list [PTask ]) -> nx .DiGraph :
78
78
"""Create the DAG from tasks, dependencies and products."""
79
+
80
+ def _add_dependency (dag : nx .DiGraph , task : PTask , node : PNode ) -> None :
81
+ """Add a dependency to the DAG."""
82
+ dag .add_node (node .name , node = node )
83
+ dag .add_edge (node .name , task .name )
84
+
85
+ # If a node is a PythonNode wrapped in another PythonNode, it is a product from
86
+ # another task that is a dependency in the current task. Thus, draw an edge
87
+ # connecting the two nodes.
88
+ if isinstance (node , PythonNode ) and isinstance (node .value , PythonNode ):
89
+ dag .add_edge (node .value .name , node .name )
90
+
91
+ def _add_product (dag : nx .DiGraph , task : PTask , node : PNode ) -> None :
92
+ """Add a product to the DAG."""
93
+ dag .add_node (node .name , node = node )
94
+ dag .add_edge (task .name , node .name )
95
+
79
96
dag = nx .DiGraph ()
80
97
81
98
for task in tasks :
82
99
dag .add_node (task .name , task = task )
83
100
84
- tree_map (lambda x : dag .add_node (x .name , node = x ), task .depends_on )
85
- tree_map (lambda x : dag .add_edge (x .name , task .name ), task .depends_on )
86
-
87
- tree_map (lambda x : dag .add_node (x .name , node = x ), task .produces )
88
- tree_map (lambda x : dag .add_edge (task .name , x .name ), task .produces )
101
+ tree_map (lambda x : _add_dependency (dag , task , x ), task .depends_on )
102
+ tree_map (lambda x : _add_product (dag , task , x ), task .produces )
89
103
90
104
# If a node is a PythonNode wrapped in another PythonNode, it is a product from
91
105
# another task that is a dependency in the current task. Thus, draw an edge
0 commit comments