Skip to content

Commit 76215ae

Browse files
committed
Add autoscaling
1 parent 52cda2c commit 76215ae

File tree

15 files changed

+666
-160
lines changed

15 files changed

+666
-160
lines changed

Dockerfile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ ARG RELEASE_VERSION
77

88
ADD . /go/src/github.com/kubeflow/mpi-operator
99
WORKDIR /go/src/github.com/kubeflow/mpi-operator
10+
RUN apt update
11+
RUN apt install -y build-essential cmake zlib1g-dev
1012
RUN make RELEASE_VERSION=${RELEASE_VERSION} mpi-operator.$VERSION
1113
RUN ln -s mpi-operator.${VERSION} _output/cmd/bin/mpi-operator
1214

Makefile

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
BIN_DIR=_output/cmd/bin
1616
REPO_PATH="github.com/kubeflow/mpi-operator"
17+
CHARM_REPO="https://github.com/charmplusplus/charm.git"
1718
REL_OSARCH="linux/amd64"
1819
GitSHA=$(shell git rev-parse HEAD)
1920
Date=$(shell date "+%Y-%m-%d %H:%M:%S")
@@ -47,10 +48,10 @@ CRD_OPTIONS ?= "crd:generateEmbeddedObjectMeta=true"
4748

4849
build: all
4950

50-
all: ${BIN_DIR} fmt vet tidy lint test mpi-operator.v2
51+
all: ${BIN_DIR} fmt vet tidy lint test charm mpi-operator.v2
5152

5253
.PHONY: mpi-operator.v2
53-
mpi-operator.v2:
54+
mpi-operator.v2: charm
5455
go build -ldflags ${LD_FLAGS_V2} -o ${BIN_DIR}/mpi-operator.v2 ./cmd/mpi-operator/
5556

5657
${BIN_DIR}:
@@ -209,3 +210,11 @@ volcano-scheduler-crd: volcano-scheduler
209210
volcano-scheduler-deploy: volcano-scheduler-crd
210211
mkdir -p $(PROJECT_DIR)/dep-manifests/volcano-scheduler/
211212
cp -f /tmp/volcano.sh/volcano/installer/volcano-development.yaml $(PROJECT_DIR)/dep-manifests/volcano-scheduler/
213+
214+
.PHONY: charm
215+
charm: ${BIN_DIR}
216+
rm -rf $(PROJECT_DIR)/dep-libs/charm
217+
mkdir -p $(PROJECT_DIR)/dep-libs/charm
218+
git clone $(CHARM_REPO) $(PROJECT_DIR)/dep-libs/charm
219+
cd $(PROJECT_DIR)/dep-libs/charm && git checkout shrinkexpand-fix && ./build charm++ netlrts-linux-x86_64 --enable-shrinkexpand -j8 --force
220+
cd pkg/controller && $(PROJECT_DIR)/dep-libs/charm/bin/charmc -language c++ -seq -o ${PROJECT_DIR}/${BIN_DIR}/rescale_client rescale_client.C -lccs-client

cmd/mpi-operator/app/server.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
"k8s.io/client-go/tools/leaderelection/resourcelock"
4040
"k8s.io/client-go/tools/record"
4141
"k8s.io/klog"
42+
metrics "k8s.io/metrics/pkg/client/clientset/versioned"
4243
schedclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned"
4344
volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned"
4445

@@ -114,7 +115,7 @@ func Run(opt *options.ServerOption) error {
114115
cfg.Burst = opt.Burst
115116

116117
// Create clients.
117-
kubeClient, leaderElectionClientSet, mpiJobClientSet, volcanoClientSet, schedClientSet, err := createClientSets(cfg, opt.GangSchedulingName)
118+
kubeClient, metricsClient, leaderElectionClientSet, mpiJobClientSet, volcanoClientSet, schedClientSet, err := createClientSets(cfg, opt.GangSchedulingName)
118119
if err != nil {
119120
return err
120121
}
@@ -143,6 +144,7 @@ func Run(opt *options.ServerOption) error {
143144

144145
controller, err := controllersv1.NewMPIJobController(
145146
kubeClient,
147+
metricsClient,
146148
mpiJobClientSet,
147149
volcanoClientSet,
148150
schedClientSet,
@@ -259,6 +261,7 @@ func createClientSets(
259261
gangSchedulingName string,
260262
) (
261263
kubeclientset.Interface,
264+
metrics.Interface,
262265
kubeclientset.Interface,
263266
mpijobclientset.Interface,
264267
volcanoclient.Interface,
@@ -268,17 +271,22 @@ func createClientSets(
268271

269272
kubeClientSet, err := kubeclientset.NewForConfig(restclientset.AddUserAgent(config, "mpi-operator"))
270273
if err != nil {
271-
return nil, nil, nil, nil, nil, err
274+
return nil, nil, nil, nil, nil, nil, err
275+
}
276+
277+
metricsClientSet, err := metrics.NewForConfig(config)
278+
if err != nil {
279+
return nil, nil, nil, nil, nil, nil, err
272280
}
273281

274282
leaderElectionClientSet, err := kubeclientset.NewForConfig(restclientset.AddUserAgent(config, "leader-election"))
275283
if err != nil {
276-
return nil, nil, nil, nil, nil, err
284+
return nil, nil, nil, nil, nil, nil, err
277285
}
278286

279287
mpiJobClientSet, err := mpijobclientset.NewForConfig(config)
280288
if err != nil {
281-
return nil, nil, nil, nil, nil, err
289+
return nil, nil, nil, nil, nil, nil, err
282290
}
283291

284292
var (
@@ -287,15 +295,15 @@ func createClientSets(
287295
)
288296
if gangSchedulingName == options.GangSchedulerVolcano {
289297
if volcanoClientSet, err = volcanoclient.NewForConfig(restclientset.AddUserAgent(config, "volcano")); err != nil {
290-
return nil, nil, nil, nil, nil, err
298+
return nil, nil, nil, nil, nil, nil, err
291299
}
292300
} else if len(gangSchedulingName) != 0 {
293301
if schedClientSet, err = schedclientset.NewForConfig(restclientset.AddUserAgent(config, "scheduler-plugins")); err != nil {
294-
return nil, nil, nil, nil, nil, err
302+
return nil, nil, nil, nil, nil, nil, err
295303
}
296304
}
297305

298-
return kubeClientSet, leaderElectionClientSet, mpiJobClientSet, volcanoClientSet, schedClientSet, nil
306+
return kubeClientSet, metricsClientSet, leaderElectionClientSet, mpiJobClientSet, volcanoClientSet, schedClientSet, nil
299307
}
300308

301309
func checkCRDExists(clientset mpijobclientset.Interface, namespace string) bool {

deploy/v2beta1/mpi-operator.yaml

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,15 @@ spec:
8080
additionalProperties:
8181
description: ReplicaSpec is a description of the replica
8282
properties:
83-
replicas:
83+
maxReplicas:
8484
description: |-
85-
Replicas is the desired number of replicas of the given template.
85+
MaxReplicas is the desired number of replicas of the given template.
86+
If unspecified, defaults to 1.
87+
format: int32
88+
type: integer
89+
minReplicas:
90+
description: |-
91+
MaxReplicas is the desired number of replicas of the given template.
8692
If unspecified, defaults to 1.
8793
format: int32
8894
type: integer
@@ -7839,6 +7845,12 @@ spec:
78397845
Defaults to 1.
78407846
format: int32
78417847
type: integer
7848+
priority:
7849+
default: 0
7850+
description: |-
7851+
Specifies the job priority
7852+
format: int32
7853+
type: integer
78427854
sshAuthMountPath:
78437855
default: /root/.ssh
78447856
description: |-
@@ -8090,8 +8102,10 @@ rules:
80908102
- update
80918103
- apiGroups:
80928104
- ""
8105+
- metrics.k8s.io
80938106
resources:
80948107
- pods
8108+
- nodes
80958109
verbs:
80968110
- create
80978111
- get
@@ -8162,12 +8176,14 @@ rules:
81628176
- '*'
81638177
- apiGroups:
81648178
- scheduling.x-k8s.io
8179+
- metrics.k8s.io
81658180
resources:
81668181
- podgroups
81678182
verbs:
81688183
- '*'
81698184
- apiGroups:
81708185
- scheduling.k8s.io
8186+
- metrics.k8s.io
81718187
resources:
81728188
- priorityclasses
81738189
verbs:
@@ -8225,6 +8241,7 @@ spec:
82258241
- args:
82268242
- -alsologtostderr
82278243
- --lock-namespace=mpi-operator
8228-
image: mpioperator/mpi-operator:master
8244+
image: adityapb/mpi-operator:autoscale
8245+
imagePullPolicy: Always
82298246
name: mpi-operator
82308247
serviceAccountName: mpi-operator

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ require (
1212
k8s.io/apimachinery v0.29.4
1313
k8s.io/apiserver v0.29.4
1414
k8s.io/client-go v0.29.4
15+
k8s.io/metrics v0.29.4
1516
k8s.io/code-generator v0.29.4
1617
k8s.io/klog v1.0.0
1718
k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,8 @@ k8s.io/klog/v2 v2.110.1 h1:U/Af64HJf7FcwMcXyKm2RPM22WZzyR7OSpYj5tg3cL0=
361361
k8s.io/klog/v2 v2.110.1/go.mod h1:YGtd1984u+GgbuZ7e08/yBuAfKLSO0+uR1Fhi6ExXjo=
362362
k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 h1:aVUu9fTY98ivBPKR9Y5w/AuzbMm96cd3YHRTU83I780=
363363
k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00/go.mod h1:AsvuZPBlUDVuCdzJ87iajxtXuR9oktsTctW/R9wwouA=
364+
k8s.io/metrics v0.29.4 h1:06sZ63/Kt9HEb5GP/1y6xbHDz6XkxnHpu949UdXfoXQ=
365+
k8s.io/metrics v0.29.4/go.mod h1:ZN9peB0nLTqPZuwQna8ZUrPFJQ0i8QNH4pqRJopS+9c=
364366
k8s.io/utils v0.0.0-20240102154912-e7106e64919e h1:eQ/4ljkx21sObifjzXwlPKpdGLrCfRziVtos3ofG/sQ=
365367
k8s.io/utils v0.0.0-20240102154912-e7106e64919e/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
366368
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.28.0 h1:TgtAeesdhpm2SGwkQasmbeqDo8th5wOBA5h/AjTKA4I=

pkg/apis/kubeflow/v2beta1/default.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,11 @@ func setDefaultsTypeLauncher(spec *ReplicaSpec) {
3131
if spec.RestartPolicy == "" {
3232
spec.RestartPolicy = DefaultLauncherRestartPolicy
3333
}
34-
if spec.Replicas == nil {
35-
spec.Replicas = ptr.To[int32](1)
34+
if spec.MaxReplicas == nil {
35+
spec.MaxReplicas = ptr.To[int32](1)
36+
}
37+
if spec.MinReplicas == nil {
38+
spec.MinReplicas = ptr.To[int32](1)
3639
}
3740
}
3841

@@ -44,8 +47,11 @@ func setDefaultsTypeWorker(spec *ReplicaSpec) {
4447
if spec.RestartPolicy == "" {
4548
spec.RestartPolicy = DefaultRestartPolicy
4649
}
47-
if spec.Replicas == nil {
48-
spec.Replicas = ptr.To[int32](0)
50+
if spec.MaxReplicas == nil {
51+
spec.MaxReplicas = ptr.To[int32](0)
52+
}
53+
if spec.MinReplicas == nil {
54+
spec.MinReplicas = ptr.To[int32](1)
4955
}
5056
}
5157

pkg/apis/kubeflow/v2beta1/types.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,10 @@ type MPIJobSpec struct {
154154
// +kubebuilder:default:=1
155155
SlotsPerWorker *int32 `json:"slotsPerWorker,omitempty"`
156156

157+
// +optional
158+
// +kubebuilder:default:=0
159+
Priority *int32 `json:"priority,omitempty"`
160+
157161
// RunLauncherAsWorker indicates whether to run worker process in launcher
158162
// Defaults to false.
159163
// +optional
@@ -174,7 +178,7 @@ type MPIJobSpec struct {
174178

175179
// launcherCreationPolicy if WaitForWorkersReady, the launcher is created only after all workers are in Ready state. Defaults to AtStartup.
176180
// +kubebuilder:validation:Enum:AtStartup;WaitForWorkersReady
177-
// +kubebuilder:default:=AtStartup
181+
// +kubebuilder:default:=WaitForWorkersReady
178182
LauncherCreationPolicy LauncherCreationPolicy `json:"launcherCreationPolicy,omitempty"`
179183

180184
// MPIImplementation is the MPI implementation.
@@ -329,7 +333,9 @@ const (
329333
type ReplicaSpec struct {
330334
// Replicas is the desired number of replicas of the given template.
331335
// If unspecified, defaults to 1.
332-
Replicas *int32 `json:"replicas,omitempty"`
336+
MaxReplicas *int32 `json:"maxReplicas,omitempty"`
337+
338+
MinReplicas *int32 `json:"minReplicas,omitempty"`
333339

334340
// Template is the object that describes the pod that
335341
// will be created for this replica. RestartPolicy in PodTemplateSpec

pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/apis/kubeflow/validation/validation.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@ func validateMPIJobName(job *kubeflow.MPIJob) field.ErrorList {
5353
var allErrs field.ErrorList
5454
var replicas int32 = 1
5555
if workerSpec := job.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker]; workerSpec != nil {
56-
if workerSpec.Replicas != nil && *workerSpec.Replicas > 0 {
57-
replicas = *workerSpec.Replicas
56+
if workerSpec.MaxReplicas != nil && *workerSpec.MaxReplicas > 0 {
57+
replicas = *workerSpec.MaxReplicas
5858
}
5959
}
6060
maximumPodHostname := fmt.Sprintf("%s-worker-%d", job.Name, replicas-1)
@@ -119,8 +119,8 @@ func validateLauncherReplicaSpec(spec *kubeflow.ReplicaSpec, path *field.Path) f
119119
return errs
120120
}
121121
errs = append(errs, validateReplicaSpec(spec, path)...)
122-
if spec.Replicas != nil && *spec.Replicas != 1 {
123-
errs = append(errs, field.Invalid(path.Child("replicas"), *spec.Replicas, "must be 1"))
122+
if spec.MaxReplicas != nil && *spec.MaxReplicas != 1 {
123+
errs = append(errs, field.Invalid(path.Child("replicas"), *spec.MaxReplicas, "must be 1"))
124124
}
125125
return errs
126126
}
@@ -131,15 +131,15 @@ func validateWorkerReplicaSpec(spec *kubeflow.ReplicaSpec, path *field.Path) fie
131131
return errs
132132
}
133133
errs = append(errs, validateReplicaSpec(spec, path)...)
134-
if spec.Replicas != nil && *spec.Replicas <= 0 {
135-
errs = append(errs, field.Invalid(path.Child("replicas"), *spec.Replicas, "must be greater than or equal to 1"))
134+
if spec.MaxReplicas != nil && *spec.MaxReplicas <= 0 {
135+
errs = append(errs, field.Invalid(path.Child("replicas"), *spec.MaxReplicas, "must be greater than or equal to 1"))
136136
}
137137
return errs
138138
}
139139

140140
func validateReplicaSpec(spec *kubeflow.ReplicaSpec, path *field.Path) field.ErrorList {
141141
var errs field.ErrorList
142-
if spec.Replicas == nil {
142+
if spec.MaxReplicas == nil {
143143
errs = append(errs, field.Required(path.Child("replicas"), "must define number of replicas"))
144144
}
145145
if !validRestartPolicies.Has(string(spec.RestartPolicy)) {

0 commit comments

Comments
 (0)