Skip to content

Commit 38e531d

Browse files
authored
Refactor controller into multiple packages (#52)
Cleaner separation between the portions that should be eventually upstreamed to Kueue and those that should be maintained outside of Kueue.
1 parent 383d6c7 commit 38e531d

17 files changed

+629
-180
lines changed

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ RUN go mod download
1414
# Copy the go source
1515
COPY cmd/main.go cmd/main.go
1616
COPY api/ api/
17-
COPY internal/controller/ internal/controller/
17+
COPY internal/ internal/
1818

1919
# Build
2020
# the GOARCH has not a default value to allow the binary be built according to the host where the command

README.md

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ be jointly queued and admitted for execution using [Kueue](https://kueue.sigs.k8
88
AppWrappers provide a flexible and workload-agnostic mechanism for enabling
99
Kueue to manage a group of Kubernetes resources
1010
as a single logical unit without requiring any Kueue-specific support by
11-
the controllers of those resources.
11+
the controllers of those resources. If some of the contained resources
12+
are Kueue-enabled, the AppWrapper controller ensures that the admission
13+
by Kueue of the parent AppWrapper will be propagated appropriately.
1214

1315
## Description
1416

@@ -26,7 +28,7 @@ set to true. We also leverage the Admission Controller to ensure that
2628
the user creating the AppWrapper is also entitled to create the contained resources
2729
and to validate AppWrapper-specific invariants.
2830

29-
See [appwrapper_webhook.go](./internal/controller/appwrapper_webhook.go)
31+
See [appwrapper_webhook.go](./internal/webhook/appwrapper_webhook.go)
3032
for the implementation.
3133

3234
#### Workload Controller
@@ -39,7 +41,15 @@ the two. This controller will make it possible for Kueue to suspend,
3941
resume, and constrain the placement of the AppWrapper. It will report
4042
the status of the AppWrapper to Kueue.
4143

42-
See [workload_controller.go](./internal/controller/workload_controller.go)
44+
See [workload_controller.go](./internal/controller/workload/workload_controller.go)
45+
for the implementation.
46+
47+
A small additional piece of logic is currently needed to generalize
48+
Kueue's ability to recognize parent/children relationships and enforce
49+
that admission by Kueue of the parent AppWrapper will be propagated to
50+
its immediate children.
51+
52+
See [child_admission_controller.go](./internal/controller/workload/child_admission_controller.go)
4353
for the implementation.
4454

4555
#### Framework Controller
@@ -52,7 +62,7 @@ status made by the Workload Controller described above.
5262

5363
This [state transition diagram](docs/state-diagram.md) depicts the
5464
lifecycle of an AppWrapper; the implementation is found in
55-
[appwrapper_controller.go](./internal/controller/appwrapper_controller.go).
65+
[appwrapper_controller.go](./internal/controller/appwrapper/appwrapper_controller.go).
5666

5767
## Getting Started
5868

cmd/main.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
3737

3838
workloadv1beta2 "github.com/project-codeflare/appwrapper/api/v1beta2"
39+
"github.com/project-codeflare/appwrapper/internal/config"
3940
"github.com/project-codeflare/appwrapper/internal/controller"
4041
//+kubebuilder:scaffold:imports
4142
)
@@ -61,7 +62,7 @@ func main() {
6162
var secureMetrics bool
6263
var enableHTTP2 bool
6364

64-
config := controller.AppWrapperConfig{}
65+
awConfig := config.AppWrapperConfig{}
6566

6667
flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
6768
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
@@ -72,7 +73,7 @@ func main() {
7273
"If set the metrics endpoint is served securely")
7374
flag.BoolVar(&enableHTTP2, "enable-http2", false,
7475
"If set, HTTP/2 will be enabled for the metrics and webhook servers")
75-
flag.BoolVar(&config.ManageJobsWithoutQueueName, "manage-no-queue", true, "Manage AppWrappers without queue names")
76+
flag.BoolVar(&awConfig.ManageJobsWithoutQueueName, "manage-no-queue", true, "Manage AppWrappers without queue names")
7677
opts := zap.Options{
7778
Development: true,
7879
}
@@ -81,7 +82,7 @@ func main() {
8182

8283
ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))
8384
setupLog.Info("Build info", "version", BuildVersion, "date", BuildDate)
84-
setupLog.Info("Configuration", "config", config)
85+
setupLog.Info("Configuration", "config", awConfig)
8586

8687
// if the enable-http2 flag is false (the default), http/2 should be disabled
8788
// due to its vulnerabilities. More specifically, disabling http/2 will
@@ -132,7 +133,7 @@ func main() {
132133
}
133134

134135
ctx := ctrl.SetupSignalHandler()
135-
err = controller.SetupWithManager(ctx, mgr, &config)
136+
err = controller.SetupWithManager(ctx, mgr, &awConfig)
136137
if err != nil {
137138
setupLog.Error(err, "unable to start appwrapper controllers")
138139
os.Exit(1)

internal/config/config.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
Copyright 2024 IBM Corporation.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package config
18+
19+
type AppWrapperConfig struct {
20+
ManageJobsWithoutQueueName bool `json:"manageJobsWithoutQueueName,omitempty"`
21+
}

internal/controller/appwrapper_controller.go renamed to internal/controller/appwrapper/appwrapper_controller.go

Lines changed: 10 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
package controller
17+
package appwrapper
1818

1919
import (
2020
"context"
@@ -36,27 +36,26 @@ import (
3636
"sigs.k8s.io/controller-runtime/pkg/log"
3737
"sigs.k8s.io/controller-runtime/pkg/reconcile"
3838

39-
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
4039
"sigs.k8s.io/kueue/pkg/controller/constants"
41-
"sigs.k8s.io/kueue/pkg/controller/jobframework"
4240
"sigs.k8s.io/kueue/pkg/podset"
4341
utilmaps "sigs.k8s.io/kueue/pkg/util/maps"
44-
"sigs.k8s.io/kueue/pkg/workload"
4542

4643
workloadv1beta2 "github.com/project-codeflare/appwrapper/api/v1beta2"
44+
"github.com/project-codeflare/appwrapper/internal/config"
45+
"github.com/project-codeflare/appwrapper/internal/utils"
4746
)
4847

4948
const (
5049
AppWrapperLabel = "workload.codeflare.dev/appwrapper"
51-
appWrapperFinalizer = "workload.codeflare.dev/finalizer"
50+
AppWrapperFinalizer = "workload.codeflare.dev/finalizer"
5251
childJobQueueName = "workload.codeflare.dev.admitted"
5352
)
5453

5554
// AppWrapperReconciler reconciles an appwrapper
5655
type AppWrapperReconciler struct {
5756
client.Client
5857
Scheme *runtime.Scheme
59-
Config *AppWrapperConfig
58+
Config *config.AppWrapperConfig
6059
}
6160

6261
type podStatusSummary struct {
@@ -72,10 +71,6 @@ type podStatusSummary struct {
7271
//+kubebuilder:rbac:groups=workload.codeflare.dev,resources=appwrappers/status,verbs=get;update;patch
7372
//+kubebuilder:rbac:groups=workload.codeflare.dev,resources=appwrappers/finalizers,verbs=update
7473

75-
// permission to manipulate workloads controlling appwrapper components to enable admitting them to our pseudo-clusterqueue
76-
// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads,verbs=get
77-
// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads/status,verbs=get;update;patch
78-
7974
// permission to edit wrapped resources: pods, services, jobs, podgroups, pytorchjobs, rayclusters
8075

8176
//+kubebuilder:rbac:groups="",resources=pods;services,verbs=get;list;watch;create;update;patch;delete
@@ -100,7 +95,7 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request)
10095

10196
// handle deletion first
10297
if !aw.DeletionTimestamp.IsZero() {
103-
if controllerutil.ContainsFinalizer(aw, appWrapperFinalizer) {
98+
if controllerutil.ContainsFinalizer(aw, AppWrapperFinalizer) {
10499
statusUpdated := false
105100
if meta.IsStatusConditionTrue(aw.Status.Conditions, string(workloadv1beta2.ResourcesDeployed)) {
106101
if !r.deleteComponents(ctx, aw) {
@@ -136,7 +131,7 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request)
136131
}
137132
}
138133

139-
if controllerutil.RemoveFinalizer(aw, appWrapperFinalizer) {
134+
if controllerutil.RemoveFinalizer(aw, AppWrapperFinalizer) {
140135
if err := r.Update(ctx, aw); err != nil {
141136
return ctrl.Result{}, err
142137
}
@@ -149,7 +144,7 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request)
149144
switch aw.Status.Phase {
150145

151146
case workloadv1beta2.AppWrapperEmpty: // initial state, inject finalizer
152-
if controllerutil.AddFinalizer(aw, appWrapperFinalizer) {
147+
if controllerutil.AddFinalizer(aw, AppWrapperFinalizer) {
153148
if err := r.Update(ctx, aw); err != nil {
154149
return ctrl.Result{}, err
155150
}
@@ -224,7 +219,6 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request)
224219
})
225220
return ctrl.Result{RequeueAfter: time.Minute}, r.Status().Update(ctx, aw)
226221
}
227-
r.propagateAdmission(ctx, aw)
228222
meta.SetStatusCondition(&aw.Status.Conditions, metav1.Condition{
229223
Type: string(workloadv1beta2.PodsReady),
230224
Status: metav1.ConditionFalse,
@@ -339,7 +333,7 @@ func (r *AppWrapperReconciler) createComponent(ctx context.Context, aw *workload
339333
for podSetsIdx, podSet := range component.PodSets {
340334
toInject := component.PodSetInfos[podSetsIdx]
341335

342-
p, err := getRawTemplate(obj.UnstructuredContent(), podSet.Path)
336+
p, err := utils.GetRawTemplate(obj.UnstructuredContent(), podSet.Path)
343337
if err != nil {
344338
return nil, err, true // Should not happen, path validity is enforced by validateAppWrapperInvariants
345339
}
@@ -411,36 +405,6 @@ func (r *AppWrapperReconciler) createComponents(ctx context.Context, aw *workloa
411405
return nil, false
412406
}
413407

414-
func (r *AppWrapperReconciler) propagateAdmission(ctx context.Context, aw *workloadv1beta2.AppWrapper) {
415-
for componentIdx, component := range aw.Spec.Components {
416-
if len(component.PodSets) > 0 {
417-
obj, err := parseComponent(aw, component.Template.Raw)
418-
if err != nil {
419-
return
420-
}
421-
wlName := jobframework.GetWorkloadNameForOwnerWithGVK(obj.GetName(), obj.GroupVersionKind())
422-
wl := &kueue.Workload{}
423-
if err := r.Client.Get(ctx, client.ObjectKey{Namespace: aw.Namespace, Name: wlName}, wl); err == nil {
424-
if !workload.IsAdmitted(wl) {
425-
admission := kueue.Admission{
426-
ClusterQueue: childJobQueueName,
427-
PodSetAssignments: make([]kueue.PodSetAssignment, len(aw.Spec.Components[componentIdx].PodSets)),
428-
}
429-
for i := range admission.PodSetAssignments {
430-
admission.PodSetAssignments[i].Name = wl.Spec.PodSets[i].Name
431-
}
432-
newWorkload := wl.DeepCopy()
433-
workload.SetQuotaReservation(newWorkload, &admission)
434-
_ = workload.SyncAdmittedCondition(newWorkload)
435-
if err = workload.ApplyAdmissionStatus(ctx, r.Client, newWorkload, false); err != nil {
436-
log.FromContext(ctx).Error(err, "syncing admission", "appwrapper", aw, "componentIdx", componentIdx, "workload", wl, "newworkload", newWorkload)
437-
}
438-
}
439-
}
440-
}
441-
}
442-
}
443-
444408
func (r *AppWrapperReconciler) deleteComponents(ctx context.Context, aw *workloadv1beta2.AppWrapper) bool {
445409
// TODO forceful deletion: See https://github.com/project-codeflare/appwrapper/issues/36
446410
log := log.FromContext(ctx)
@@ -478,7 +442,7 @@ func (r *AppWrapperReconciler) workloadStatus(ctx context.Context, aw *workloadv
478442
client.MatchingLabels{AppWrapperLabel: aw.Name}); err != nil {
479443
return nil, err
480444
}
481-
summary := &podStatusSummary{expected: ExpectedPodCount(aw)}
445+
summary := &podStatusSummary{expected: utils.ExpectedPodCount(aw)}
482446

483447
for _, pod := range pods.Items {
484448
switch pod.Status.Phase {
@@ -496,24 +460,6 @@ func (r *AppWrapperReconciler) workloadStatus(ctx context.Context, aw *workloadv
496460
return summary, nil
497461
}
498462

499-
func replicas(ps workloadv1beta2.AppWrapperPodSet) int32 {
500-
if ps.Replicas == nil {
501-
return 1
502-
} else {
503-
return *ps.Replicas
504-
}
505-
}
506-
507-
func ExpectedPodCount(aw *workloadv1beta2.AppWrapper) int32 {
508-
var expected int32
509-
for _, c := range aw.Spec.Components {
510-
for _, s := range c.PodSets {
511-
expected += replicas(s)
512-
}
513-
}
514-
return expected
515-
}
516-
517463
// SetupWithManager sets up the controller with the Manager.
518464
func (r *AppWrapperReconciler) SetupWithManager(mgr ctrl.Manager) error {
519465
return ctrl.NewControllerManagedBy(mgr).

0 commit comments

Comments
 (0)