Skip to content

Commit 0186159

Browse files
julienmancusohhzhang16
authored andcommitted
fix: Implement scaling Grove subresources (#2531)
Signed-off-by: Hannah Zhang <[email protected]>
1 parent 236cca6 commit 0186159

File tree

8 files changed

+243
-14
lines changed

8 files changed

+243
-14
lines changed

deploy/cloud/helm/platform/Chart.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@ maintainers:
1919
url: https://www.nvidia.com
2020
description: A Helm chart for NVIDIA Dynamo Platform.
2121
type: application
22-
version: 0.4.0
22+
version: 0.4.1
2323
home: https://nvidia.com
2424
dependencies:
2525
- name: dynamo-operator
26-
version: 0.4.0
26+
version: 0.4.1
2727
repository: file://components/operator
2828
condition: dynamo-operator.enabled
2929
- name: nats

deploy/cloud/helm/platform/components/operator/Chart.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@ type: application
2727
# This is the chart version. This version number should be incremented each time you make changes
2828
# to the chart and its templates, including the app version.
2929
# Versions are expected to follow Semantic Versioning (https://semver.org/)
30-
version: 0.4.0
30+
version: 0.4.1
3131
# This is the version number of the application being deployed. This version number should be
3232
# incremented each time you make changes to the application. Versions are not expected to
3333
# follow Semantic Versioning. They should reflect the version the application is using.
3434
# It is recommended to use it with quotes.
35-
appVersion: "0.4.0"
35+
appVersion: "0.4.1"

deploy/cloud/helm/platform/components/operator/templates/manager-rbac.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,15 @@ rules:
128128
- patch
129129
- update
130130
- watch
131+
- apiGroups:
132+
- grove.io
133+
resources:
134+
- podcliques/scale
135+
- podcliquescalinggroups/scale
136+
verbs:
137+
- get
138+
- patch
139+
- update
131140
- apiGroups:
132141
- apps
133142
resources:

deploy/cloud/operator/cmd/main.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,13 @@ import (
3131
clientv3 "go.etcd.io/etcd/client/v3"
3232
corev1 "k8s.io/api/core/v1"
3333
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
34+
"k8s.io/client-go/discovery/cached/memory"
35+
"k8s.io/client-go/dynamic"
3436
"k8s.io/client-go/informers"
3537
"k8s.io/client-go/kubernetes"
3638
_ "k8s.io/client-go/plugin/pkg/client/auth"
39+
"k8s.io/client-go/restmapper"
40+
"k8s.io/client-go/scale"
3741
k8sCache "k8s.io/client-go/tools/cache"
3842
"sigs.k8s.io/controller-runtime/pkg/cache"
3943

@@ -65,6 +69,34 @@ var (
6569
setupLog = ctrl.Log.WithName("setup")
6670
)
6771

72+
func createScalesGetter(mgr ctrl.Manager) (scale.ScalesGetter, error) {
73+
config := mgr.GetConfig()
74+
75+
// Create kubernetes client for discovery
76+
kubeClient, err := kubernetes.NewForConfig(config)
77+
if err != nil {
78+
return nil, err
79+
}
80+
81+
// Create cached discovery client
82+
cachedDiscovery := memory.NewMemCacheClient(kubeClient.Discovery())
83+
84+
// Create REST mapper
85+
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cachedDiscovery)
86+
87+
scalesGetter, err := scale.NewForConfig(
88+
config,
89+
restMapper,
90+
dynamic.LegacyAPIPathResolverFunc,
91+
scale.NewDiscoveryScaleKindResolver(cachedDiscovery),
92+
)
93+
if err != nil {
94+
return nil, err
95+
}
96+
97+
return scalesGetter, nil
98+
}
99+
68100
func init() {
69101
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
70102

@@ -321,11 +353,19 @@ func main() {
321353
setupLog.Error(err, "unable to create controller", "controller", "DynamoComponentDeployment")
322354
os.Exit(1)
323355
}
356+
// Create scale client for Grove resource scaling
357+
scaleClient, err := createScalesGetter(mgr)
358+
if err != nil {
359+
setupLog.Error(err, "unable to create scale client")
360+
os.Exit(1)
361+
}
362+
324363
if err = (&controller.DynamoGraphDeploymentReconciler{
325364
Client: mgr.GetClient(),
326365
Recorder: mgr.GetEventRecorderFor("dynamographdeployment"),
327366
Config: ctrlConfig,
328367
DockerSecretRetriever: dockerSecretRetriever,
368+
ScaleClient: scaleClient,
329369
}).SetupWithManager(mgr); err != nil {
330370
setupLog.Error(err, "unable to create controller", "controller", "DynamoGraphDeployment")
331371
os.Exit(1)

deploy/cloud/operator/config/rbac/role.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,15 @@ rules:
8383
- patch
8484
- update
8585
- watch
86+
- apiGroups:
87+
- grove.io
88+
resources:
89+
- podcliques/scale
90+
- podcliquescalinggroups/scale
91+
verbs:
92+
- get
93+
- patch
94+
- update
8695
- apiGroups:
8796
- grove.io
8897
resources:

deploy/cloud/operator/internal/controller/dynamographdeployment_controller.go

Lines changed: 103 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,17 @@ package controller
2020
import (
2121
"context"
2222
"fmt"
23+
"strings"
2324

2425
grovev1alpha1 "github.com/NVIDIA/grove/operator/api/core/v1alpha1"
26+
"k8s.io/apimachinery/pkg/api/errors"
27+
2528
networkingv1beta1 "istio.io/client-go/pkg/apis/networking/v1beta1"
2629
corev1 "k8s.io/api/core/v1"
2730
networkingv1 "k8s.io/api/networking/v1"
2831
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
32+
"k8s.io/apimachinery/pkg/runtime/schema"
33+
"k8s.io/client-go/scale"
2934
"k8s.io/client-go/tools/record"
3035
ctrl "sigs.k8s.io/controller-runtime"
3136
"sigs.k8s.io/controller-runtime/pkg/builder"
@@ -50,6 +55,20 @@ const (
5055
PendingState State = "pending"
5156
)
5257

58+
var (
59+
// Grove GroupVersionResources for scaling operations
60+
podCliqueGVR = schema.GroupVersionResource{
61+
Group: "grove.io",
62+
Version: "v1alpha1",
63+
Resource: "podcliques",
64+
}
65+
podCliqueScalingGroupGVR = schema.GroupVersionResource{
66+
Group: "grove.io",
67+
Version: "v1alpha1",
68+
Resource: "podcliquescalinggroups",
69+
}
70+
)
71+
5372
type etcdStorage interface {
5473
DeleteKeys(ctx context.Context, prefix string) error
5574
}
@@ -60,12 +79,15 @@ type DynamoGraphDeploymentReconciler struct {
6079
Config commonController.Config
6180
Recorder record.EventRecorder
6281
DockerSecretRetriever dockerSecretRetriever
82+
ScaleClient scale.ScalesGetter
6383
}
6484

6585
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments,verbs=get;list;watch;create;update;patch;delete
6686
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments/status,verbs=get;update;patch
6787
// +kubebuilder:rbac:groups=nvidia.com,resources=dynamographdeployments/finalizers,verbs=update
6888
// +kubebuilder:rbac:groups=grove.io,resources=podgangsets,verbs=get;list;watch;create;update;patch;delete
89+
// +kubebuilder:rbac:groups=grove.io,resources=podcliques/scale,verbs=get;update;patch
90+
// +kubebuilder:rbac:groups=grove.io,resources=podcliquescalinggroups/scale,verbs=get;update;patch
6991

7092
// Reconcile is part of the main kubernetes reconciliation loop which aims to
7193
// move the current state of the cluster closer to the desired state.
@@ -156,6 +178,80 @@ func (r *DynamoGraphDeploymentReconciler) reconcileResources(ctx context.Context
156178

157179
}
158180

181+
// scaleGroveResource scales a Grove resource using the generic scaling function
182+
func (r *DynamoGraphDeploymentReconciler) scaleGroveResource(ctx context.Context, resourceName, namespace string, newReplicas int32, resourceType string) error {
183+
logger := log.FromContext(ctx)
184+
// Determine the GroupVersionResource based on resource type
185+
var gvr schema.GroupVersionResource
186+
switch resourceType {
187+
case "PodClique":
188+
gvr = podCliqueGVR
189+
case "PodCliqueScalingGroup":
190+
gvr = podCliqueScalingGroupGVR
191+
default:
192+
return fmt.Errorf("unsupported Grove resource type: %s", resourceType)
193+
}
194+
195+
// Use the generic scaling function
196+
err := commonController.ScaleResource(ctx, r.ScaleClient, gvr, namespace, resourceName, newReplicas)
197+
if err != nil {
198+
if errors.IsNotFound(err) {
199+
// Resource doesn't exist yet - this is normal during initial creation when Grove is still creating the resources asynchronously
200+
logger.V(1).Info("Grove resource not found yet, skipping scaling for now - will retry on next reconciliation", "gvr", gvr, "name", resourceName, "namespace", namespace)
201+
return nil
202+
}
203+
}
204+
return err
205+
}
206+
207+
// reconcileGroveScaling handles scaling operations for Grove resources based on service replica changes
208+
func (r *DynamoGraphDeploymentReconciler) reconcileGroveScaling(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) error {
209+
logger := log.FromContext(ctx)
210+
logger.V(1).Info("Reconciling Grove scaling operations")
211+
212+
replicaIndex := 0
213+
for serviceName, component := range dynamoDeployment.Spec.Services {
214+
// Skip if replicas are not specified
215+
if component.Replicas == nil {
216+
continue
217+
}
218+
219+
numberOfNodes := component.GetNumberOfNodes()
220+
isMultinode := numberOfNodes > 1
221+
222+
if isMultinode {
223+
// Scale PodCliqueScalingGroup for multinode services
224+
// Grove naming pattern: {DGD.name}-{replicaIndex}-{serviceName}
225+
resourceName := fmt.Sprintf("%s-%d-%s", dynamoDeployment.Name, replicaIndex, strings.ToLower(serviceName))
226+
err := r.scaleGroveResource(ctx,
227+
resourceName,
228+
dynamoDeployment.Namespace,
229+
*component.Replicas,
230+
"PodCliqueScalingGroup")
231+
if err != nil {
232+
logger.Error(err, "Failed to scale PodCliqueScalingGroup", "serviceName", serviceName, "resourceName", resourceName, "replicas", *component.Replicas)
233+
return fmt.Errorf("failed to scale PodCliqueScalingGroup %s: %w", resourceName, err)
234+
}
235+
} else {
236+
// Scale individual PodClique for single-node services
237+
// Grove naming pattern: {DGD.name}-{replicaIndex}-{serviceName}
238+
resourceName := fmt.Sprintf("%s-%d-%s", dynamoDeployment.Name, replicaIndex, strings.ToLower(serviceName))
239+
err := r.scaleGroveResource(ctx,
240+
resourceName,
241+
dynamoDeployment.Namespace,
242+
*component.Replicas,
243+
"PodClique")
244+
if err != nil {
245+
logger.Error(err, "Failed to scale PodClique", "serviceName", serviceName, "resourceName", resourceName, "replicas", *component.Replicas)
246+
return fmt.Errorf("failed to scale PodClique %s: %w", resourceName, err)
247+
}
248+
}
249+
}
250+
251+
logger.V(1).Info("Successfully reconciled Grove scaling operations")
252+
return nil
253+
}
254+
159255
func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Context, dynamoDeployment *nvidiacomv1alpha1.DynamoGraphDeployment) (State, Reason, Message, error) {
160256
logger := log.FromContext(ctx)
161257
// generate the dynamoComponentsDeployments from the config
@@ -177,6 +273,13 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Co
177273
}
178274
return false
179275
})
276+
277+
// Handle Grove scaling operations after structural changes
278+
if err := r.reconcileGroveScaling(ctx, dynamoDeployment); err != nil {
279+
logger.Error(err, "failed to reconcile Grove scaling")
280+
return FailedState, "grove_scaling_failed", Message(err.Error()), err
281+
}
282+
180283
resources := []Resource{groveGangSetAsResource}
181284
for componentName, component := range dynamoDeployment.Spec.Services {
182285
if component.ComponentType == consts.ComponentTypeFrontend {
@@ -203,10 +306,6 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Co
203306
ingressSpec = *component.Ingress
204307
}
205308
mainComponentIngress := dynamo.GenerateComponentIngress(ctx, dynamo.GetDynamoComponentName(dynamoDeployment, componentName), dynamoDeployment.Namespace, ingressSpec)
206-
if err != nil {
207-
logger.Error(err, "failed to generate the main component ingress")
208-
return "", "", "", fmt.Errorf("failed to generate the main component ingress: %w", err)
209-
}
210309
_, syncedMainComponentIngress, err := commonController.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*networkingv1.Ingress, bool, error) {
211310
if !ingressSpec.Enabled || ingressSpec.IngressControllerClassName == nil {
212311
logger.Info("Ingress is not enabled")
@@ -224,10 +323,6 @@ func (r *DynamoGraphDeploymentReconciler) reconcileGroveResources(ctx context.Co
224323
// generate the main component virtual service
225324
if r.Config.IngressConfig.UseVirtualService() {
226325
mainComponentVirtualService := dynamo.GenerateComponentVirtualService(ctx, dynamo.GetDynamoComponentName(dynamoDeployment, componentName), dynamoDeployment.Namespace, ingressSpec)
227-
if err != nil {
228-
logger.Error(err, "failed to generate the main component virtual service")
229-
return "", "", "", fmt.Errorf("failed to generate the main component virtual service: %w", err)
230-
}
231326
_, syncedMainComponentVirtualService, err := commonController.SyncResource(ctx, r, dynamoDeployment, func(ctx context.Context) (*networkingv1beta1.VirtualService, bool, error) {
232327
if !ingressSpec.IsVirtualServiceEnabled() {
233328
logger.Info("VirtualService is not enabled")
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package controller_common
19+
20+
import (
21+
"context"
22+
"fmt"
23+
24+
autoscalingv1 "k8s.io/api/autoscaling/v1"
25+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26+
"k8s.io/apimachinery/pkg/runtime/schema"
27+
"k8s.io/client-go/scale"
28+
"sigs.k8s.io/controller-runtime/pkg/log"
29+
)
30+
31+
// ScaleResource scales any Kubernetes resource using the Scale subresource
32+
func ScaleResource(ctx context.Context, scaleClient scale.ScalesGetter, gvr schema.GroupVersionResource, namespace, name string, replicas int32) error {
33+
logger := log.FromContext(ctx)
34+
logger.Info("Scaling resource", "gvr", gvr, "name", name, "namespace", namespace, "replicas", replicas)
35+
36+
if scaleClient == nil {
37+
logger.Error(nil, "Scale client is nil")
38+
return fmt.Errorf("scale client is nil")
39+
}
40+
41+
currentScale, err := scaleClient.Scales(namespace).Get(ctx, gvr.GroupResource(), name, metav1.GetOptions{})
42+
if err != nil {
43+
logger.Error(err, "Failed to get current scale - resource may not support scale subresource", "gvr", gvr, "name", name, "namespace", namespace, "groupResource", gvr.GroupResource())
44+
return fmt.Errorf("failed to get current scale for %s %s (resource may not support scale subresource): %w", gvr.Resource, name, err)
45+
}
46+
47+
if replicas < 0 {
48+
return fmt.Errorf("replicas must be >= 0, got %d", replicas)
49+
}
50+
51+
if currentScale.Spec.Replicas == replicas {
52+
logger.V(1).Info("Resource already at desired replica count", "gvr", gvr, "name", name, "replicas", replicas)
53+
return nil
54+
}
55+
56+
scaleObj := &autoscalingv1.Scale{
57+
ObjectMeta: metav1.ObjectMeta{
58+
Name: name,
59+
Namespace: namespace,
60+
ResourceVersion: currentScale.ObjectMeta.ResourceVersion,
61+
},
62+
Spec: autoscalingv1.ScaleSpec{
63+
Replicas: replicas,
64+
},
65+
}
66+
67+
logger.V(1).Info("Updating scale", "gvr", gvr, "name", name, "newReplicas", replicas)
68+
_, err = scaleClient.Scales(namespace).Update(ctx, gvr.GroupResource(), scaleObj, metav1.UpdateOptions{})
69+
if err != nil {
70+
logger.Error(err, "Failed to update scale", "gvr", gvr, "name", name, "replicas", replicas)
71+
return fmt.Errorf("failed to update scale for %s %s: %w", gvr.Resource, name, err)
72+
}
73+
74+
logger.Info("Successfully scaled resource", "gvr", gvr, "name", name, "oldReplicas", currentScale.Spec.Replicas, "newReplicas", replicas)
75+
return nil
76+
}

deploy/helm/chart/Chart.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,5 @@ apiVersion: v2
1717
name: dynamo-graph
1818
description: A Helm chart to deploy a Dynamo graph on Kubernetes
1919
type: application
20-
version: 0.4.0
21-
appVersion: 0.4.0
20+
version: 0.4.1
21+
appVersion: 0.4.1

0 commit comments

Comments
 (0)