Skip to content

Commit

Permalink
feat(backend): Add support for job and task placeholders in the KFP b…
Browse files Browse the repository at this point in the history
…ackend (#11599)

This adds support for the following placeholders in the KFP backend:

- dsl.PIPELINE_JOB_NAME_PLACEHOLDER
- dsl.PIPELINE_JOB_RESOURCE_NAME_PLACEHOLDER
- dsl.PIPELINE_JOB_ID_PLACEHOLDER
- dsl.PIPELINE_TASK_NAME_PLACEHOLDER
- dsl.PIPELINE_TASK_ID_PLACEHOLDER

Resolves:
#10453

Signed-off-by: mprahl <[email protected]>
  • Loading branch information
mprahl authored Feb 10, 2025
1 parent 926aec5 commit 6a13f4b
Show file tree
Hide file tree
Showing 15 changed files with 153 additions and 5 deletions.
2 changes: 1 addition & 1 deletion backend/src/apiserver/template/v2_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func (t *V2Spec) RunWorkflow(modelRun *model.Run, options RunWorkflowOptions) (u
if err := protojson.Unmarshal(bytes, spec); err != nil {
return nil, util.NewInternalServerError(err, "Failed to parse pipeline spec")
}
job := &pipelinespec.PipelineJob{PipelineSpec: spec}
job := &pipelinespec.PipelineJob{PipelineSpec: spec, DisplayName: modelRun.DisplayName}
jobRuntimeConfig, err := modelToPipelineJobRuntimeConfig(&modelRun.RuntimeConfig)
if err != nil {
return nil, util.NewInternalServerError(err, "Failed to convert to PipelineJob RuntimeConfig")
Expand Down
4 changes: 4 additions & 0 deletions backend/src/v2/cmd/driver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ var (
driverType = flag.String(driverTypeArg, "", "task driver type, one of ROOT_DAG, DAG, CONTAINER")
pipelineName = flag.String("pipeline_name", "", "pipeline context name")
runID = flag.String("run_id", "", "pipeline run uid")
runName = flag.String("run_name", "", "pipeline run name (Kubernetes object name)")
runDisplayName = flag.String("run_display_name", "", "pipeline run display name")
componentSpecJson = flag.String("component", "{}", "component spec")
taskSpecJson = flag.String("task", "", "task spec")
runtimeConfigJson = flag.String("runtime_config", "", "jobruntime config")
Expand Down Expand Up @@ -153,6 +155,8 @@ func drive() (err error) {
options := driver.Options{
PipelineName: *pipelineName,
RunID: *runID,
RunName: *runName,
RunDisplayName: *runDisplayName,
Namespace: namespace,
Component: componentSpec,
Task: taskSpec,
Expand Down
5 changes: 5 additions & 0 deletions backend/src/v2/compiler/argocompiler/argo.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,11 @@ func runID() string {
return "{{workflow.uid}}"
}

func runResourceName() string {
// This translates to the Argo Workflow object name.
return "{{workflow.name}}"
}

func workflowParameter(name string) string {
return fmt.Sprintf("{{workflow.parameters.%s}}", name)
}
Expand Down
2 changes: 2 additions & 0 deletions backend/src/v2/compiler/argocompiler/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ func (c *workflowCompiler) addContainerDriverTemplate() string {
"--type", "CONTAINER",
"--pipeline_name", c.spec.GetPipelineInfo().GetName(),
"--run_id", runID(),
"--run_name", runResourceName(),
"--run_display_name", c.job.DisplayName,
"--dag_execution_id", inputValue(paramParentDagID),
"--component", inputValue(paramComponent),
"--task", inputValue(paramTask),
Expand Down
2 changes: 2 additions & 0 deletions backend/src/v2/compiler/argocompiler/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,8 @@ func (c *workflowCompiler) addDAGDriverTemplate() string {
"--type", inputValue(paramDriverType),
"--pipeline_name", c.spec.GetPipelineInfo().GetName(),
"--run_id", runID(),
"--run_name", runResourceName(),
"--run_display_name", c.job.DisplayName,
"--dag_execution_id", inputValue(paramParentDagID),
"--component", inputValue(paramComponent),
"--task", inputValue(paramTask),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ spec:
- my-pipeline
- --run_id
- '{{workflow.uid}}'
- --run_name
- '{{workflow.name}}'
- --run_display_name
- ''
- --dag_execution_id
- '{{inputs.parameters.parent-dag-id}}'
- --component
Expand Down Expand Up @@ -283,6 +287,10 @@ spec:
- my-pipeline
- --run_id
- '{{workflow.uid}}'
- --run_name
- '{{workflow.name}}'
- --run_display_name
- ''
- --dag_execution_id
- '{{inputs.parameters.parent-dag-id}}'
- --component
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ spec:
- namespace/n1/pipeline/hello-world
- --run_id
- '{{workflow.uid}}'
- --run_name
- '{{workflow.name}}'
- --run_display_name
- ''
- --dag_execution_id
- '{{inputs.parameters.parent-dag-id}}'
- --component
Expand Down Expand Up @@ -226,6 +230,10 @@ spec:
- namespace/n1/pipeline/hello-world
- --run_id
- '{{workflow.uid}}'
- --run_name
- '{{workflow.name}}'
- --run_display_name
- ''
- --dag_execution_id
- '{{inputs.parameters.parent-dag-id}}'
- --component
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ spec:
- pipeline-with-exit-handler
- --run_id
- '{{workflow.uid}}'
- --run_name
- '{{workflow.name}}'
- --run_display_name
- ''
- --dag_execution_id
- '{{inputs.parameters.parent-dag-id}}'
- --component
Expand Down Expand Up @@ -289,6 +293,10 @@ spec:
- pipeline-with-exit-handler
- --run_id
- '{{workflow.uid}}'
- --run_name
- '{{workflow.name}}'
- --run_display_name
- ''
- --dag_execution_id
- '{{inputs.parameters.parent-dag-id}}'
- --component
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ spec:
- namespace/n1/pipeline/hello-world
- --run_id
- '{{workflow.uid}}'
- --run_name
- '{{workflow.name}}'
- --run_display_name
- ''
- --dag_execution_id
- '{{inputs.parameters.parent-dag-id}}'
- --component
Expand Down Expand Up @@ -216,6 +220,10 @@ spec:
- namespace/n1/pipeline/hello-world
- --run_id
- '{{workflow.uid}}'
- --run_name
- '{{workflow.name}}'
- --run_display_name
- ''
- --dag_execution_id
- '{{inputs.parameters.parent-dag-id}}'
- --component
Expand Down
4 changes: 4 additions & 0 deletions backend/src/v2/compiler/argocompiler/testdata/importer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ spec:
- pipeline-with-importer
- --run_id
- '{{workflow.uid}}'
- --run_name
- '{{workflow.name}}'
- --run_display_name
- ''
- --dag_execution_id
- '{{inputs.parameters.parent-dag-id}}'
- --component
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ spec:
- my-pipeline
- --run_id
- '{{workflow.uid}}'
- --run_name
- '{{workflow.name}}'
- --run_display_name
- ''
- --dag_execution_id
- '{{inputs.parameters.parent-dag-id}}'
- --component
Expand Down Expand Up @@ -308,6 +312,10 @@ spec:
- my-pipeline
- --run_id
- '{{workflow.uid}}'
- --run_name
- '{{workflow.name}}'
- --run_display_name
- ''
- --dag_execution_id
- '{{inputs.parameters.parent-dag-id}}'
- --component
Expand Down
32 changes: 28 additions & 4 deletions backend/src/v2/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ type Options struct {

// optional, allows to specify kubernetes-specific executor config
KubernetesExecutorConfig *kubernetesplatform.KubernetesExecutorConfig

// optional, required only if the {{$.pipeline_job_resource_name}} placeholder is used
RunName string
// optional, required only if the {{$.pipeline_job_name}} placeholder is used
RunDisplayName string
}

// Identifying information used for error messages
Expand Down Expand Up @@ -260,7 +265,7 @@ func Container(ctx context.Context, opts Options, mlmd *metadata.Client, cacheCl
if err != nil {
return nil, err
}
inputs, err := resolveInputs(ctx, dag, iterationIndex, pipeline, opts.Task, opts.Component.GetInputDefinitions(), mlmd, expr)
inputs, err := resolveInputs(ctx, dag, iterationIndex, pipeline, opts, mlmd, expr)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -809,7 +814,7 @@ func DAG(ctx context.Context, opts Options, mlmd *metadata.Client) (execution *E
if err != nil {
return nil, err
}
inputs, err := resolveInputs(ctx, dag, iterationIndex, pipeline, opts.Task, opts.Component.GetInputDefinitions(), mlmd, expr)
inputs, err := resolveInputs(ctx, dag, iterationIndex, pipeline, opts, mlmd, expr)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1034,12 +1039,16 @@ func validateNonRoot(opts Options) error {
return nil
}

func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, pipeline *metadata.Pipeline, task *pipelinespec.PipelineTaskSpec, inputsSpec *pipelinespec.ComponentInputsSpec, mlmd *metadata.Client, expr *expression.Expr) (inputs *pipelinespec.ExecutorInput_Inputs, err error) {
func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, pipeline *metadata.Pipeline, opts Options, mlmd *metadata.Client, expr *expression.Expr) (inputs *pipelinespec.ExecutorInput_Inputs, err error) {
defer func() {
if err != nil {
err = fmt.Errorf("failed to resolve inputs: %w", err)
}
}()

task := opts.Task
inputsSpec := opts.Component.GetInputDefinitions()

glog.V(4).Infof("dag: %v", dag)
glog.V(4).Infof("task: %v", task)
inputParams, _, err := dag.Execution.GetParameters()
Expand Down Expand Up @@ -1245,7 +1254,22 @@ func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int,
runtimeValue := paramSpec.GetRuntimeValue()
switch t := runtimeValue.Value.(type) {
case *pipelinespec.ValueOrRuntimeParameter_Constant:
inputs.ParameterValues[name] = runtimeValue.GetConstant()
val := runtimeValue.GetConstant()

switch val.GetStringValue() {
case "{{$.pipeline_job_name}}":
inputs.ParameterValues[name] = structpb.NewStringValue(opts.RunDisplayName)
case "{{$.pipeline_job_resource_name}}":
inputs.ParameterValues[name] = structpb.NewStringValue(opts.RunName)
case "{{$.pipeline_job_uuid}}":
inputs.ParameterValues[name] = structpb.NewStringValue(opts.RunID)
case "{{$.pipeline_task_name}}":
inputs.ParameterValues[name] = structpb.NewStringValue(task.GetTaskInfo().GetName())
case "{{$.pipeline_task_uuid}}":
inputs.ParameterValues[name] = structpb.NewStringValue(fmt.Sprintf("%d", opts.DAGExecutionID))
default:
inputs.ParameterValues[name] = val
}
default:
return nil, paramError(fmt.Errorf("param runtime value spec of type %T not implemented", t))
}
Expand Down
59 changes: 59 additions & 0 deletions samples/v2/pipeline_with_placeholders.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Copyright 2025 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from kfp import compiler
from kfp import dsl
from kfp.dsl import component


@component
def print_all_placeholders(
job_name: str,
job_resource_name: str,
job_id: str,
task_name: str,
task_id: str,
):
allPlaceholders = [job_name, job_resource_name, job_id, task_name, task_id]

for placeholder in allPlaceholders:
if "\{\{" in placeholder or placeholder == "":
raise RuntimeError(
"Expected the placeholder to be replaced with a value: " + placeholder
)

assert task_name == "print-all-placeholders"
assert job_name.startswith("pipeline-with-placeholders ")
assert job_resource_name.startswith("pipeline-with-placeholders-")

output = ", ".join(allPlaceholders)
print(output)


@dsl.pipeline(name="pipeline-with-placeholders")
def pipeline_with_placeholders():
print_all_placeholders(
job_name=dsl.PIPELINE_JOB_NAME_PLACEHOLDER,
job_resource_name=dsl.PIPELINE_JOB_RESOURCE_NAME_PLACEHOLDER,
job_id=dsl.PIPELINE_JOB_ID_PLACEHOLDER,
task_name=dsl.PIPELINE_TASK_NAME_PLACEHOLDER,
task_id=dsl.PIPELINE_TASK_ID_PLACEHOLDER,
).set_caching_options(False)


if __name__ == "__main__":
compiler.Compiler().compile(
pipeline_func=pipeline_with_placeholders,
package_path=__file__.replace(".py", ".yaml"),
)
2 changes: 2 additions & 0 deletions samples/v2/sample_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import producer_consumer_param
import subdagio
import two_step_pipeline_containerized
import pipeline_with_placeholders

_MINUTE = 60 # seconds
_DEFAULT_TIMEOUT = 5 * _MINUTE
Expand Down Expand Up @@ -74,6 +75,7 @@ def test(self):
TestCase(pipeline_func=subdagio.artifact.crust),
TestCase(
pipeline_func=subdagio.multiple_artifacts_namedtuple.crust),
TestCase(pipeline_func=pipeline_with_placeholders.pipeline_with_placeholders),
]

with ThreadPoolExecutor() as executor:
Expand Down
6 changes: 6 additions & 0 deletions sdk/python/kfp/dsl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
# hack: constants and custom type generics have to be defined here to be captured by autodoc and autodocsumm used in ./docs/conf.py
PIPELINE_JOB_NAME_PLACEHOLDER = '{{$.pipeline_job_name}}'
"""A placeholder used to obtain a pipeline job name within a task at pipeline runtime.
In Kubeflow Pipelines, this maps to the pipeline run display name.
Example:
::
Expand All @@ -82,6 +83,8 @@ def my_pipeline():

PIPELINE_JOB_RESOURCE_NAME_PLACEHOLDER = '{{$.pipeline_job_resource_name}}'
"""A placeholder used to obtain a pipeline job resource name within a task at pipeline runtime.
In Kubeflow Pipelines, this maps to the pipeline run name in the underlying pipeline engine (e.g. an Argo Workflow
object name).
Example:
::
Expand All @@ -96,6 +99,7 @@ def my_pipeline():

PIPELINE_JOB_ID_PLACEHOLDER = '{{$.pipeline_job_uuid}}'
"""A placeholder used to obtain a pipeline job ID within a task at pipeline runtime.
In Kubeflow Pipelines, this maps to the pipeline run UUID.
Example:
::
Expand All @@ -110,6 +114,7 @@ def my_pipeline():

PIPELINE_TASK_NAME_PLACEHOLDER = '{{$.pipeline_task_name}}'
"""A placeholder used to obtain a task name within a task at pipeline runtime.
In Kubeflow Pipelines, this maps to the component name.
Example:
::
Expand All @@ -124,6 +129,7 @@ def my_pipeline():

PIPELINE_TASK_ID_PLACEHOLDER = '{{$.pipeline_task_uuid}}'
"""A placeholder used to obtain a task ID within a task at pipeline runtime.
In Kubeflow Pipelines, this maps to the component's ML Metadata (MLMD) execution ID.
Example:
::
Expand Down

0 comments on commit 6a13f4b

Please sign in to comment.