Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 141 additions & 6 deletions backend/src/v2/component/launcher_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -827,20 +827,155 @@ func compileCmdAndArgs(executorInput *pipelinespec.ExecutorInput, cmd string, ar
executorInputJSONString := string(executorInputJSON)

compiledCmd := strings.ReplaceAll(cmd, executorInputJSONKey, executorInputJSONString)
compiledArgs := make([]string, 0, len(args))
for placeholder, replacement := range placeholders {
cmd = strings.ReplaceAll(cmd, placeholder, replacement)
compiledCmd = strings.ReplaceAll(compiledCmd, placeholder, replacement)
}

compiledArgs := make([]string, 0, len(args))
providedInputs := getProvidedInputs(executorInput)
for _, arg := range args {
compiledArgTemplate := strings.ReplaceAll(arg, executorInputJSONKey, executorInputJSONString)
for placeholder, replacement := range placeholders {
compiledArgTemplate = strings.ReplaceAll(compiledArgTemplate, placeholder, replacement)
expandedArgs, err := resolveStructPlaceholders(arg, providedInputs)
if err != nil {
return "", nil, err
}
for _, expanded := range expandedArgs {
compiledArgTemplate := strings.ReplaceAll(expanded, executorInputJSONKey, executorInputJSONString)
for placeholder, replacement := range placeholders {
compiledArgTemplate = strings.ReplaceAll(compiledArgTemplate, placeholder, replacement)
}
compiledArgs = append(compiledArgs, compiledArgTemplate)
}
compiledArgs = append(compiledArgs, compiledArgTemplate)
}
return compiledCmd, compiledArgs, nil
}

// getProvidedInputs returns the input names that have values supplied (parameters or artifacts).
func getProvidedInputs(executorInput *pipelinespec.ExecutorInput) map[string]struct{} {
provided := make(map[string]struct{})
for name, v := range executorInput.GetInputs().GetParameterValues() {
if v != nil {
provided[name] = struct{}{}
}
}
for name, artifactList := range executorInput.GetInputs().GetArtifacts() {
if artifactList != nil && len(artifactList.Artifacts) > 0 {
provided[name] = struct{}{}
}
}
return provided
}

// resolveStructPlaceholders expands IfPresent/Concat placeholder strings into concrete args.
// If no struct placeholder is detected, the original arg is returned.
func resolveStructPlaceholders(arg string, providedInputs map[string]struct{}) ([]string, error) {
if strings.HasPrefix(arg, `{"Concat": `) || strings.HasPrefix(arg, `{"IfPresent": `) {
var obj interface{}
if err := json.Unmarshal([]byte(arg), &obj); err != nil {
return nil, fmt.Errorf("failed to unmarshal struct placeholder %q: %w", arg, err)
}
resolved, err := recursivelyResolveStruct(obj, providedInputs)
if err != nil {
return nil, err
}
switch v := resolved.(type) {
case nil:
return []string{}, nil
case string:
return []string{v}, nil
case []string:
return v, nil
default:
return nil, fmt.Errorf("unexpected resolved struct placeholder type %T for %q", v, arg)
}
}

return []string{arg}, nil
}

// recursivelyResolveStruct handles nested IfPresent/Concat structures.
func recursivelyResolveStruct(obj interface{}, providedInputs map[string]struct{}) (interface{}, error) {
switch typed := obj.(type) {
case string:
return typed, nil
case []interface{}:
var parts []string
for _, item := range typed {
resolved, err := recursivelyResolveStruct(item, providedInputs)
if err != nil {
return nil, err
}
switch v := resolved.(type) {
case nil:
continue
case string:
parts = append(parts, v)
case []string:
parts = append(parts, v...)
default:
return nil, fmt.Errorf("unexpected list item type %T in struct placeholder", v)
}
}
return parts, nil
case map[string]interface{}:
if len(typed) != 1 {
return nil, fmt.Errorf("invalid struct placeholder: %v", typed)
}
for key, value := range typed {
switch key {
case "Concat":
items, ok := value.([]interface{})
if !ok {
return nil, fmt.Errorf("Concat value must be a list, got %T", value)
}
var parts []string
for _, item := range items {
resolved, err := recursivelyResolveStruct(item, providedInputs)
if err != nil {
return nil, err
}
switch v := resolved.(type) {
case nil:
continue
case string:
parts = append(parts, v)
case []string:
parts = append(parts, v...)
default:
return nil, fmt.Errorf("unexpected Concat item type %T", v)
}
}
return strings.Join(parts, ""), nil
case "IfPresent":
inner, ok := value.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("IfPresent value must be an object, got %T", value)
}
inputName, ok := inner["InputName"].(string)
if !ok {
return nil, fmt.Errorf("IfPresent.InputName must be a string, got %T", inner["InputName"])
}
_, exists := providedInputs[inputName]
var branch interface{}
if exists {
branch = inner["Then"]
} else {
branch = inner["Else"]
}
if branch == nil {
return nil, nil
}
return recursivelyResolveStruct(branch, providedInputs)
default:
return nil, fmt.Errorf("unsupported struct placeholder key %q", key)
}
}
default:
return nil, fmt.Errorf("unexpected struct placeholder type %T", typed)
}

return nil, nil
}

// Add executor input placeholders to provided map.
func getPlaceholders(executorInput *pipelinespec.ExecutorInput) (placeholders map[string]string, err error) {
defer func() {
Expand Down
44 changes: 44 additions & 0 deletions backend/src/v2/component/launcher_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,50 @@ func Test_executorInput_compileCmdAndArgs(t *testing.T) {
assert.Equal(t, "9312", config["sphinx_port"])
}

func Test_compileCmdAndArgs_structPlaceholders(t *testing.T) {
executorInput := &pipelinespec.ExecutorInput{
Inputs: &pipelinespec.ExecutorInput_Inputs{
ParameterValues: map[string]*structpb.Value{
"file": structpb.NewStringValue("/etc/hosts"),
"line_number": structpb.NewBoolValue(true),
"flag_value": structpb.NewStringValue("foo"),
},
},
}

cmd := "cat"
args := []string{
"{{$.inputs.parameters['file']}}",
`{"IfPresent": {"InputName": "line_number", "Then": ["-n"]}}`,
`{"Concat": ["--flag=", "{{$.inputs.parameters['flag_value']}}"]}`,
}

compiledCmd, compiledArgs, err := compileCmdAndArgs(executorInput, cmd, args)
assert.NoError(t, err)
assert.Equal(t, "cat", compiledCmd)
assert.Equal(t, []string{"/etc/hosts", "-n", "--flag=foo"}, compiledArgs)
}

func Test_compileCmdAndArgs_structPlaceholders_Omitted(t *testing.T) {
executorInput := &pipelinespec.ExecutorInput{
Inputs: &pipelinespec.ExecutorInput_Inputs{
ParameterValues: map[string]*structpb.Value{
"file": structpb.NewStringValue("/etc/hosts"),
},
},
}

cmd := "cat"
args := []string{
"{{$.inputs.parameters['file']}}",
`{"IfPresent": {"InputName": "line_number", "Then": ["-n"]}}`,
}

_, compiledArgs, err := compileCmdAndArgs(executorInput, cmd, args)
assert.NoError(t, err)
assert.Equal(t, []string{"/etc/hosts"}, compiledArgs)
}

func Test_get_log_Writer(t *testing.T) {
old := osCreateFunc
defer func() { osCreateFunc = old }()
Expand Down