@@ -18,6 +18,7 @@ package appwrapper
18
18
19
19
import (
20
20
"context"
21
+ "encoding/json"
21
22
"fmt"
22
23
"time"
23
24
@@ -26,6 +27,7 @@ import (
26
27
v1 "k8s.io/api/core/v1"
27
28
apierrors "k8s.io/apimachinery/pkg/api/errors"
28
29
"k8s.io/apimachinery/pkg/api/meta"
30
+ kresource "k8s.io/apimachinery/pkg/api/resource"
29
31
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30
32
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
31
33
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -50,6 +52,107 @@ func parseComponent(raw []byte, expectedNamespace string) (*unstructured.Unstruc
50
52
return obj , nil
51
53
}
52
54
55
+ func hasResourceRequest (spec map [string ]interface {}, resource string ) bool {
56
+ usesResource := func (container map [string ]interface {}) bool {
57
+ _ , ok := container ["resources" ]
58
+ if ! ok {
59
+ return false
60
+ }
61
+ resources , ok := container ["resources" ].(map [string ]interface {})
62
+ if ! ok {
63
+ return false
64
+ }
65
+ for _ , key := range []string {"limits" , "requests" } {
66
+ if _ , ok := resources [key ]; ok {
67
+ if list , ok := resources [key ].(map [string ]interface {}); ok {
68
+ if _ , ok := list [resource ]; ok {
69
+ switch quantity := list [resource ].(type ) {
70
+ case int :
71
+ if quantity > 0 {
72
+ return true
73
+ }
74
+ case int32 :
75
+ if quantity > 0 {
76
+ return true
77
+ }
78
+ case int64 :
79
+ if quantity > 0 {
80
+ return true
81
+ }
82
+ case string :
83
+ kq , err := kresource .ParseQuantity (quantity )
84
+ if err == nil && ! kq .IsZero () {
85
+ return true
86
+ }
87
+ }
88
+ }
89
+ }
90
+ }
91
+ }
92
+ return false
93
+ }
94
+
95
+ for _ , key := range []string {"containers" , "initContainers" } {
96
+ if containers , ok := spec [key ]; ok {
97
+ if carray , ok := containers .([]interface {}); ok {
98
+ for _ , containerI := range carray {
99
+ container , ok := containerI .(map [string ]interface {})
100
+ if ok && usesResource (container ) {
101
+ return true
102
+ }
103
+ }
104
+ }
105
+ }
106
+ }
107
+
108
+ return false
109
+ }
110
+
111
+ func addNodeSelectorsToAffinity (spec map [string ]interface {}, selectorTerms []v1.NodeSelectorTerm ) error {
112
+ if _ , ok := spec ["affinity" ]; ! ok {
113
+ spec ["affinity" ] = map [string ]interface {}{}
114
+ }
115
+ affinity , ok := spec ["affinity" ].(map [string ]interface {})
116
+ if ! ok {
117
+ return fmt .Errorf ("spec.affinity is not a map" )
118
+ }
119
+ if _ , ok := affinity ["nodeAffinity" ]; ! ok {
120
+ affinity ["nodeAffinity" ] = map [string ]interface {}{}
121
+ }
122
+ nodeAffinity , ok := affinity ["nodeAffinity" ].(map [string ]interface {})
123
+ if ! ok {
124
+ return fmt .Errorf ("spec.affinity.nodeAffinity is not a map" )
125
+ }
126
+ if _ , ok := nodeAffinity ["requiredDuringSchedulingIgnoredDuringExecution" ]; ! ok {
127
+ nodeAffinity ["requiredDuringSchedulingIgnoredDuringExecution" ] = map [string ]interface {}{}
128
+ }
129
+ nodeSelector , ok := nodeAffinity ["requiredDuringSchedulingIgnoredDuringExecution" ].(map [string ]interface {})
130
+ if ! ok {
131
+ return fmt .Errorf ("spec.affinity.nodeAffinity.requiredDuringSchedulingIgnoredDuringExecution is not a map" )
132
+ }
133
+ if _ , ok := nodeSelector ["nodeSelectorTerms" ]; ! ok {
134
+ nodeSelector ["nodeSelectorTerms" ] = []interface {}{}
135
+ }
136
+ existingTerms , ok := nodeSelector ["nodeSelectorTerms" ].([]interface {})
137
+ if ! ok {
138
+ return fmt .Errorf ("spec.affinity.nodeAffinity.requiredDuringSchedulingIgnoredDuringExecution.nodeSelectorTerms is not an array" )
139
+ }
140
+ for _ , termToAdd := range selectorTerms {
141
+ bytes , err := json .Marshal (termToAdd )
142
+ if err != nil {
143
+ return fmt .Errorf ("marshalling selectorTerm %v: %w" , termToAdd , err )
144
+ }
145
+ var obj interface {}
146
+ if err = json .Unmarshal (bytes , & obj ); err != nil {
147
+ return fmt .Errorf ("unmarshalling selectorTerm %v: %w" , termToAdd , err )
148
+ }
149
+ existingTerms = append (existingTerms , obj )
150
+ }
151
+ nodeSelector ["nodeSelectorTerms" ] = existingTerms
152
+
153
+ return nil
154
+ }
155
+
53
156
//gocyclo:ignore
54
157
func (r * AppWrapperReconciler ) createComponent (ctx context.Context , aw * workloadv1beta2.AppWrapper , componentIdx int ) (error , bool ) {
55
158
component := aw .Spec .Components [componentIdx ]
@@ -148,6 +251,28 @@ func (r *AppWrapperReconciler) createComponent(ctx context.Context, aw *workload
148
251
spec ["schedulerName" ] = r .Config .SchedulerName
149
252
}
150
253
}
254
+
255
+ if r .Config .Autopilot != nil && r .Config .Autopilot .InjectAffinity {
256
+ toAdd := map [string ]string {}
257
+ for resource , labels := range r .Config .Autopilot .ResourceUnhealthyConfig {
258
+ if hasResourceRequest (spec , resource ) {
259
+ for k , v := range labels {
260
+ toAdd [k ] = v
261
+ }
262
+ }
263
+ }
264
+ if len (toAdd ) > 0 {
265
+ nodeSelectors := []v1.NodeSelectorTerm {}
266
+ for k , v := range toAdd {
267
+ nodeSelectors = append (nodeSelectors , v1.NodeSelectorTerm {
268
+ MatchExpressions : []v1.NodeSelectorRequirement {{Operator : v1 .NodeSelectorOpNotIn , Key : k , Values : []string {v }}},
269
+ })
270
+ }
271
+ if err := addNodeSelectorsToAffinity (spec , nodeSelectors ); err != nil {
272
+ log .FromContext (ctx ).Error (err , "failed to inject Autopilot affinities" )
273
+ }
274
+ }
275
+ }
151
276
}
152
277
153
278
if err := controllerutil .SetControllerReference (aw , obj , r .Scheme ); err != nil {
0 commit comments