Skip to content

Commit 0106303

Browse files
dushulinShulinDuFuture-Outlierrueianandrewsykim
authored
[RayJob] avoid RayCluster resource leak in k8s job mode(#3903) (#4080)
* fix[rayjob]: RayJobDeploymentStatus is still Running when head was killed(#0) * nit Signed-off-by: Future-Outlier <[email protected]> * update edge case for sidecar mode Signed-off-by: Future-Outlier <[email protected]> * fix integration test by dsl Signed-off-by: Future-Outlier <[email protected]> * update Signed-off-by: Future-Outlier <[email protected]> * update Signed-off-by: Future-Outlier <[email protected]> * update Signed-off-by: Future-Outlier <[email protected]> * fix test Signed-off-by: Future-Outlier <[email protected]> * update rueian and andrew's advice Signed-off-by: Future-Outlier <[email protected]> Co-authored-by: Rueian <[email protected]> Co-authored-by: Andrew Sy Kim <[email protected]> --------- Signed-off-by: Future-Outlier <[email protected]> Co-authored-by: ShulinDu <[email protected]> Co-authored-by: Future-Outlier <[email protected]> Co-authored-by: Rueian <[email protected]> Co-authored-by: Andrew Sy Kim <[email protected]>
1 parent 7dcdb26 commit 0106303

File tree

3 files changed

+105
-7
lines changed

3 files changed

+105
-7
lines changed

ray-operator/controllers/ray/rayjob_controller.go

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -279,14 +279,23 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
279279
jobInfo, err := rayDashboardClient.GetJobInfo(ctx, rayJobInstance.Status.JobId)
280280
if err != nil {
281281
// If the Ray job was not found, GetJobInfo returns a BadRequest error.
282-
if rayJobInstance.Spec.SubmissionMode == rayv1.HTTPMode && errors.IsBadRequest(err) {
283-
logger.Info("The Ray job was not found. Submit a Ray job via an HTTP request.", "JobId", rayJobInstance.Status.JobId)
284-
if _, err := rayDashboardClient.SubmitJob(ctx, rayJobInstance); err != nil {
285-
logger.Error(err, "Failed to submit the Ray job", "JobId", rayJobInstance.Status.JobId)
286-
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
282+
if errors.IsBadRequest(err) {
283+
if rayJobInstance.Spec.SubmissionMode == rayv1.HTTPMode {
284+
logger.Info("The Ray job was not found. Submit a Ray job via an HTTP request.", "JobId", rayJobInstance.Status.JobId)
285+
if _, err := rayDashboardClient.SubmitJob(ctx, rayJobInstance); err != nil {
286+
logger.Error(err, "Failed to submit the Ray job", "JobId", rayJobInstance.Status.JobId)
287+
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
288+
}
289+
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
290+
}
291+
if isSubmitterFinished {
292+
rayJobInstance.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusFailed
293+
rayJobInstance.Status.Reason = rayv1.AppFailed
294+
rayJobInstance.Status.Message = "Submitter completed but Ray job not found in RayCluster."
295+
break
287296
}
288-
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
289297
}
298+
290299
logger.Error(err, "Failed to get job info", "JobId", rayJobInstance.Status.JobId)
291300
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
292301
}
@@ -1050,7 +1059,11 @@ func (r *RayJobReconciler) checkSubmitterAndUpdateStatusIfNeeded(ctx context.Con
10501059
}
10511060

10521061
if headPod == nil {
1053-
logger.Info("Ray head pod not found, skipping sidecar container status check")
1062+
// If head pod is deleted, mark the RayJob as failed
1063+
shouldUpdate = true
1064+
rayJob.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusFailed
1065+
rayJob.Status.Reason = rayv1.AppFailed
1066+
rayJob.Status.Message = "Ray head pod not found."
10541067
return
10551068
}
10561069

ray-operator/test/e2erayjob/rayjob_sidecar_mode_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,4 +155,47 @@ env_vars:
155155
g.Expect(err).NotTo(HaveOccurred())
156156
LogWithTimestamp(test.T(), "Deleted RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name)
157157
})
158+
159+
test.T().Run("RayJob fails when head Pod is deleted when job is running", func(_ *testing.T) {
160+
rayJobAC := rayv1ac.RayJob("delete-head-after-submit-sidecar-mode", namespace.Name).
161+
WithSpec(rayv1ac.RayJobSpec().
162+
WithSubmissionMode(rayv1.SidecarMode).
163+
WithRayClusterSpec(NewRayClusterSpec()).
164+
WithEntrypoint("python -c \"import time; time.sleep(60)\"").
165+
WithShutdownAfterJobFinishes(true))
166+
167+
rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions)
168+
g.Expect(err).NotTo(HaveOccurred())
169+
LogWithTimestamp(test.T(), "Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name)
170+
171+
// Wait until the RayJob's job status transitions to Running
172+
LogWithTimestamp(test.T(), "Waiting for RayJob %s/%s to be 'Running'", rayJob.Namespace, rayJob.Name)
173+
g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium).
174+
Should(WithTransform(RayJobStatus, Equal(rayv1.JobStatusRunning)))
175+
176+
// Fetch RayCluster and delete the head Pod
177+
rayJob, err = GetRayJob(test, rayJob.Namespace, rayJob.Name)
178+
g.Expect(err).NotTo(HaveOccurred())
179+
rayCluster, err := GetRayCluster(test, rayJob.Namespace, rayJob.Status.RayClusterName)
180+
g.Expect(err).NotTo(HaveOccurred())
181+
headPod, err := GetHeadPod(test, rayCluster)
182+
g.Expect(err).NotTo(HaveOccurred())
183+
LogWithTimestamp(test.T(), "Deleting head Pod %s/%s for RayCluster %s", headPod.Namespace, headPod.Name, rayCluster.Name)
184+
err = test.Client().Core().CoreV1().Pods(headPod.Namespace).Delete(test.Ctx(), headPod.Name, metav1.DeleteOptions{})
185+
g.Expect(err).NotTo(HaveOccurred())
186+
187+
// After head pod deletion, controller should mark RayJob as Failed with a specific message
188+
g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium).
189+
Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusFailed)))
190+
g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium).
191+
Should(WithTransform(RayJobReason, Equal(rayv1.AppFailed)))
192+
g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium).
193+
Should(WithTransform(func(job *rayv1.RayJob) string { return job.Status.Message },
194+
Equal("Ray head pod not found.")))
195+
196+
// Cleanup
197+
err = test.Client().Ray().RayV1().RayJobs(namespace.Name).Delete(test.Ctx(), rayJob.Name, metav1.DeleteOptions{})
198+
g.Expect(err).NotTo(HaveOccurred())
199+
LogWithTimestamp(test.T(), "Deleted RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name)
200+
})
158201
}

ray-operator/test/e2erayjob/rayjob_test.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,48 @@ env_vars:
274274
To(WithTransform(RayJobReason, Equal(rayv1.DeadlineExceeded)))
275275
})
276276

277+
test.T().Run("RayJob fails when head Pod is deleted when job is running", func(_ *testing.T) {
278+
rayJobAC := rayv1ac.RayJob("delete-head-after-submit", namespace.Name).
279+
WithSpec(rayv1ac.RayJobSpec().
280+
WithRayClusterSpec(NewRayClusterSpec()).
281+
WithEntrypoint("python -c \"import time; time.sleep(60)\"").
282+
WithShutdownAfterJobFinishes(true))
283+
284+
rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions)
285+
g.Expect(err).NotTo(HaveOccurred())
286+
LogWithTimestamp(test.T(), "Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name)
287+
288+
// Wait until the RayJob's job status transitions to Running
289+
LogWithTimestamp(test.T(), "Waiting for RayJob %s/%s to be 'Running'", rayJob.Namespace, rayJob.Name)
290+
g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium).
291+
Should(WithTransform(RayJobStatus, Equal(rayv1.JobStatusRunning)))
292+
293+
// Fetch RayCluster and delete the head Pod
294+
rayJob, err = GetRayJob(test, rayJob.Namespace, rayJob.Name)
295+
g.Expect(err).NotTo(HaveOccurred())
296+
rayCluster, err := GetRayCluster(test, rayJob.Namespace, rayJob.Status.RayClusterName)
297+
g.Expect(err).NotTo(HaveOccurred())
298+
headPod, err := GetHeadPod(test, rayCluster)
299+
g.Expect(err).NotTo(HaveOccurred())
300+
LogWithTimestamp(test.T(), "Deleting head Pod %s/%s for RayCluster %s", headPod.Namespace, headPod.Name, rayCluster.Name)
301+
err = test.Client().Core().CoreV1().Pods(headPod.Namespace).Delete(test.Ctx(), headPod.Name, metav1.DeleteOptions{})
302+
g.Expect(err).NotTo(HaveOccurred())
303+
304+
// After head pod deletion, controller should mark RayJob as Failed with a specific message
305+
g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium).
306+
Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusFailed)))
307+
g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium).
308+
Should(WithTransform(RayJobReason, Equal(rayv1.AppFailed)))
309+
g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium).
310+
Should(WithTransform(func(job *rayv1.RayJob) string { return job.Status.Message },
311+
Equal("Submitter completed but Ray job not found in RayCluster.")))
312+
313+
// Cleanup
314+
err = test.Client().Ray().RayV1().RayJobs(namespace.Name).Delete(test.Ctx(), rayJob.Name, metav1.DeleteOptions{})
315+
g.Expect(err).NotTo(HaveOccurred())
316+
LogWithTimestamp(test.T(), "Deleted RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name)
317+
})
318+
277319
test.T().Run("RayJob should be created, but not updated when managed externally", func(_ *testing.T) {
278320
// RayJob
279321
rayJobAC := rayv1ac.RayJob("managed-externally", namespace.Name).

0 commit comments

Comments
 (0)