Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v3.5.2-atlan-0.4 #4

Open
wants to merge 7 commits into
base: release-3.5.2
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix: Apply podSpecPatch in woc.execWf.Spec and template to pod sequen…
…tially

Signed-off-by: Tianchu Zhao <evantczhao@gmail.com>
tczhao committed Jun 3, 2024
commit dc7d4977d5d41ea0caef174d8a40e9fdcafd9fe5
27 changes: 13 additions & 14 deletions workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
@@ -45,10 +45,6 @@ var (
}
)

func (woc *wfOperationCtx) hasPodSpecPatch(tmpl *wfv1.Template) bool {
return woc.execWf.Spec.HasPodSpecPatch() || tmpl.HasPodSpecPatch()
}

// scheduleOnDifferentHost adds affinity to prevent retry on the same host when
// retryStrategy.affinity.nodeAntiAffinity{} is specified
func (woc *wfOperationCtx) scheduleOnDifferentHost(node *wfv1.NodeStatus, pod *apiv1.Pod) error {
@@ -347,24 +343,27 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin
}
}

// Apply the patch string from template
if woc.hasPodSpecPatch(tmpl) {
tmpl.PodSpecPatch, err = util.PodSpecPatchMerge(woc.wf, tmpl)
if err != nil {
return nil, errors.Wrap(err, "", "Failed to merge the workflow PodSpecPatch with the template PodSpecPatch due to invalid format")
}

// Apply the patch string from workflow and template
var podSpecPatchs []string
if woc.execWf.Spec.HasPodSpecPatch() {
// Final substitution for workflow level PodSpecPatch
localParams := make(map[string]string)
if tmpl.IsPodType() {
localParams[common.LocalVarPodName] = pod.Name
}
tmpl, err := common.ProcessArgs(tmpl, &wfv1.Arguments{}, woc.globalParams, localParams, false, woc.wf.Namespace, woc.controller.configMapInformer.GetIndexer())
newTmpl := tmpl.DeepCopy()
newTmpl.PodSpecPatch = woc.execWf.Spec.PodSpecPatch
processedTmpl, err := common.ProcessArgs(newTmpl, &wfv1.Arguments{}, woc.globalParams, localParams, false, woc.wf.Namespace, woc.controller.configMapInformer.GetIndexer())
if err != nil {
return nil, errors.Wrap(err, "", "Failed to substitute the PodSpecPatch variables")
}

patchedPodSpec, err := util.ApplyPodSpecPatch(pod.Spec, tmpl.PodSpecPatch)
podSpecPatchs = append(podSpecPatchs, processedTmpl.PodSpecPatch)
}
if tmpl.HasPodSpecPatch() {
podSpecPatchs = append(podSpecPatchs, tmpl.PodSpecPatch)
}
if len(podSpecPatchs) > 0 {
patchedPodSpec, err := util.ApplyPodSpecPatch(pod.Spec, podSpecPatchs...)
if err != nil {
return nil, errors.Wrap(err, "", "Error applying PodSpecPatch")
}
44 changes: 16 additions & 28 deletions workflow/util/util.go
Original file line number Diff line number Diff line change
@@ -1237,44 +1237,32 @@ func ConvertYAMLToJSON(str string) (string, error) {
return str, nil
}

// PodSpecPatchMerge will do strategic merge the workflow level PodSpecPatch and template level PodSpecPatch
func PodSpecPatchMerge(wf *wfv1.Workflow, tmpl *wfv1.Template) (string, error) {
wfPatch, err := ConvertYAMLToJSON(wf.Spec.PodSpecPatch)
if err != nil {
return "", err
}
tmplPatch, err := ConvertYAMLToJSON(tmpl.PodSpecPatch)
if err != nil {
return "", err
}
data, err := strategicpatch.StrategicMergePatch([]byte(wfPatch), []byte(tmplPatch), apiv1.PodSpec{})
return string(data), err
}

func ApplyPodSpecPatch(podSpec apiv1.PodSpec, podSpecPatchYaml string) (*apiv1.PodSpec, error) {
func ApplyPodSpecPatch(podSpec apiv1.PodSpec, podSpecPatchYamls ...string) (*apiv1.PodSpec, error) {
podSpecJson, err := json.Marshal(podSpec)
if err != nil {
return nil, errors.Wrap(err, "", "Failed to marshal the Pod spec")
}

// must convert to json because PodSpec has only json tags
podSpecPatchJson, err := ConvertYAMLToJSON(podSpecPatchYaml)
if err != nil {
return nil, errors.Wrap(err, "", "Failed to convert the PodSpecPatch yaml to json")
}
for _, podSpecPatchYaml := range podSpecPatchYamls {
// must convert to json because PodSpec has only json tags
podSpecPatchJson, err := ConvertYAMLToJSON(podSpecPatchYaml)
if err != nil {
return nil, errors.Wrap(err, "", "Failed to convert the PodSpecPatch yaml to json")
}

// validate the patch to be a PodSpec
if err := json.Unmarshal([]byte(podSpecPatchJson), &apiv1.PodSpec{}); err != nil {
return nil, fmt.Errorf("invalid podSpecPatch %q: %w", podSpecPatchYaml, err)
}
// validate the patch to be a PodSpec
if err := json.Unmarshal([]byte(podSpecPatchJson), &apiv1.PodSpec{}); err != nil {
return nil, fmt.Errorf("invalid podSpecPatch %q: %w", podSpecPatchYaml, err)
}

modJson, err := strategicpatch.StrategicMergePatch(podSpecJson, []byte(podSpecPatchJson), apiv1.PodSpec{})
if err != nil {
return nil, errors.Wrap(err, "", "Error occurred during strategic merge patch")
podSpecJson, err = strategicpatch.StrategicMergePatch(podSpecJson, []byte(podSpecPatchJson), apiv1.PodSpec{})
if err != nil {
return nil, errors.Wrap(err, "", "Error occurred during strategic merge patch")
}
}

var newPodSpec apiv1.PodSpec
err = json.Unmarshal(modJson, &newPodSpec)
err = json.Unmarshal(podSpecJson, &newPodSpec)
if err != nil {
return nil, errors.Wrap(err, "", "Error in Unmarshalling after merge the patch")
}