Skip to content

Commit 5990b05

Browse files
authored
[Feature] Add initializing timeout for RayService (#4143)
* [Feature] Add initializing timeout for RayService Signed-off-by: seanlaii <[email protected]> * fix lint * validate the annotation at the beginning * fix lint * add e2e * enhance comment * stop recreating raycluster after timeout * refactor --------- Signed-off-by: seanlaii <[email protected]>
1 parent 2acc219 commit 5990b05

File tree

7 files changed

+715
-13
lines changed

7 files changed

+715
-13
lines changed

ray-operator/apis/ray/v1/rayservice_types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,7 @@ const (
200200

201201
const (
202202
RayServiceInitializing RayServiceConditionReason = "Initializing"
203+
RayServiceInitializingTimeout RayServiceConditionReason = "InitializingTimeout"
203204
ZeroServeEndpoints RayServiceConditionReason = "ZeroServeEndpoints"
204205
NonZeroServeEndpoints RayServiceConditionReason = "NonZeroServeEndpoints"
205206
BothActivePendingClustersExist RayServiceConditionReason = "BothActivePendingClustersExist"

ray-operator/controllers/ray/rayservice_controller.go

Lines changed: 127 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -127,10 +127,24 @@ func (r *RayServiceReconciler) Reconcile(ctx context.Context, request ctrl.Reque
127127
}
128128

129129
r.cleanUpServeConfigCache(ctx, rayServiceInstance)
130-
if err = r.cleanUpRayClusterInstance(ctx, rayServiceInstance); err != nil {
130+
hasRayClustersToClean, err := r.cleanUpRayClusterInstance(ctx, rayServiceInstance)
131+
if err != nil {
131132
return ctrl.Result{}, err
132133
}
133134

135+
// If the RayService has timed out during initialization, skip the rest of the reconciliation.
136+
// The service is in a terminal failure state - only cleanup (above) is needed.
137+
// The user must delete and recreate the RayService to recover.
138+
if isInitializingTimeout(rayServiceInstance) {
139+
// Requeue only if there are still RayClusters to clean up
140+
// This avoids unnecessary reconciliations after all resources are deleted
141+
if hasRayClustersToClean {
142+
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, nil
143+
}
144+
logger.Info("RayService in terminal failure state, all RayClusters cleaned up")
145+
return ctrl.Result{}, nil
146+
}
147+
134148
// Find active and pending ray cluster objects given current service name.
135149
var activeRayClusterInstance, pendingRayClusterInstance *rayv1.RayCluster
136150
if activeRayClusterInstance, pendingRayClusterInstance, err = r.reconcileRayCluster(ctx, rayServiceInstance); err != nil {
@@ -438,7 +452,9 @@ func (r *RayServiceReconciler) calculateStatus(
438452
}
439453

440454
rayServiceInstance.Status.NumServeEndpoints = int32(numServeEndpoints) //nolint:gosec // This is a false positive from gosec. See https://github.com/securego/gosec/issues/1212 for more details.
441-
calculateConditions(rayServiceInstance)
455+
456+
// Calculate conditions based on current state (endpoints, clusters, timeout, etc.)
457+
calculateConditions(ctx, r, rayServiceInstance)
442458

443459
// The definition of `ServiceStatus` is equivalent to the `RayServiceReady` condition
444460
rayServiceInstance.Status.ServiceStatus = rayv1.NotRunning
@@ -449,7 +465,7 @@ func (r *RayServiceReconciler) calculateStatus(
449465
return nil
450466
}
451467

452-
func calculateConditions(rayServiceInstance *rayv1.RayService) {
468+
func calculateConditions(ctx context.Context, r *RayServiceReconciler, rayServiceInstance *rayv1.RayService) {
453469
if rayServiceInstance.Status.Conditions == nil {
454470
rayServiceInstance.Status.Conditions = []metav1.Condition{}
455471
}
@@ -458,10 +474,15 @@ func calculateConditions(rayServiceInstance *rayv1.RayService) {
458474
setCondition(rayServiceInstance, rayv1.RayServiceReady, metav1.ConditionFalse, rayv1.RayServiceInitializing, message)
459475
setCondition(rayServiceInstance, rayv1.UpgradeInProgress, metav1.ConditionFalse, rayv1.RayServiceInitializing, message)
460476
}
477+
461478
if rayServiceInstance.Status.NumServeEndpoints > 0 {
462479
setCondition(rayServiceInstance, rayv1.RayServiceReady, metav1.ConditionTrue, rayv1.NonZeroServeEndpoints, "Number of serve endpoints is greater than 0")
463480
} else if meta.IsStatusConditionTrue(rayServiceInstance.Status.Conditions, string(rayv1.RayServiceReady)) {
464481
setCondition(rayServiceInstance, rayv1.RayServiceReady, metav1.ConditionFalse, rayv1.ZeroServeEndpoints, "Number of serve endpoints dropped to 0")
482+
} else {
483+
// Check if initializing timeout has been exceeded
484+
// This runs after endpoint check, so if endpoints appear, they take priority
485+
markFailedOnInitializingTimeout(ctx, r, rayServiceInstance)
465486
}
466487

467488
activeClusterName := rayServiceInstance.Status.ActiveServiceStatus.RayClusterName
@@ -472,7 +493,8 @@ func calculateConditions(rayServiceInstance *rayv1.RayService) {
472493
setCondition(rayServiceInstance, rayv1.UpgradeInProgress, metav1.ConditionFalse, rayv1.NoPendingCluster, "Active Ray cluster exists and no pending Ray cluster")
473494
} else {
474495
cond := meta.FindStatusCondition(rayServiceInstance.Status.Conditions, string(rayv1.UpgradeInProgress))
475-
if cond == nil || cond.Reason != string(rayv1.RayServiceInitializing) {
496+
// Don't override the condition if RayService is initializing or has timed out
497+
if cond == nil || (cond.Reason != string(rayv1.RayServiceInitializing) && cond.Reason != string(rayv1.RayServiceInitializingTimeout)) {
476498
setCondition(rayServiceInstance, rayv1.UpgradeInProgress, metav1.ConditionUnknown, rayv1.NoActiveCluster, "No active Ray cluster exists, and the RayService is not initializing. Please open a GitHub issue in the KubeRay repository.")
477499
}
478500
}
@@ -893,23 +915,27 @@ func (r *RayServiceReconciler) reconcileRayCluster(ctx context.Context, rayServi
893915
}
894916

895917
// cleanUpRayClusterInstance cleans up all the dangling RayCluster instances that are owned by the RayService instance.
896-
func (r *RayServiceReconciler) cleanUpRayClusterInstance(ctx context.Context, rayServiceInstance *rayv1.RayService) error {
918+
// Returns true if there are still RayCluster instances that need to be cleaned up (either scheduled for deletion or waiting for deletion delay).
919+
func (r *RayServiceReconciler) cleanUpRayClusterInstance(ctx context.Context, rayServiceInstance *rayv1.RayService) (bool, error) {
897920
logger := ctrl.LoggerFrom(ctx)
898921
rayClusterList := rayv1.RayClusterList{}
899922

900923
var err error
901924
if err = r.List(ctx, &rayClusterList, common.RayServiceRayClustersAssociationOptions(rayServiceInstance).ToListOptions()...); err != nil {
902-
return err
925+
return false, err
903926
}
904927

905928
// Determine the ray cluster deletion delay seconds
906929
deletionDelay := RayClusterDeletionDelayDuration
907930
if rayServiceInstance.Spec.RayClusterDeletionDelaySeconds != nil {
908931
deletionDelay = time.Duration(*rayServiceInstance.Spec.RayClusterDeletionDelaySeconds) * time.Second
909932
}
933+
934+
hasRayClustersToClean := false
910935
// Clean up RayCluster instances. Each instance is deleted after the configured deletion delay.
911936
for _, rayClusterInstance := range rayClusterList.Items {
912937
if rayClusterInstance.Name != rayServiceInstance.Status.ActiveServiceStatus.RayClusterName && rayClusterInstance.Name != rayServiceInstance.Status.PendingServiceStatus.RayClusterName {
938+
hasRayClustersToClean = true
913939
cachedTimestamp, exists := r.RayClusterDeletionTimestamps.Get(rayClusterInstance.Name)
914940
if !exists {
915941
deletionTimestamp := metav1.Now().Add(deletionDelay)
@@ -932,15 +958,15 @@ func (r *RayServiceReconciler) cleanUpRayClusterInstance(ctx context.Context, ra
932958
logger.Info("reconcileRayCluster", "delete Ray cluster", rayClusterInstance.Name, "reason", reasonForDeletion)
933959
if err := r.Delete(ctx, &rayClusterInstance, client.PropagationPolicy(metav1.DeletePropagationBackground)); err != nil {
934960
r.Recorder.Eventf(rayServiceInstance, corev1.EventTypeWarning, string(utils.FailedToDeleteRayCluster), "Failed to delete the RayCluster %s/%s: %v", rayClusterInstance.Namespace, rayClusterInstance.Name, err)
935-
return err
961+
return false, err
936962
}
937963
r.Recorder.Eventf(rayServiceInstance, corev1.EventTypeNormal, string(utils.DeletedRayCluster), "Deleted the RayCluster %s/%s", rayClusterInstance.Namespace, rayClusterInstance.Name)
938964
}
939965
}
940966
}
941967
}
942968

943-
return nil
969+
return hasRayClustersToClean, nil
944970
}
945971

946972
func (r *RayServiceReconciler) getRayClusterByNamespacedName(ctx context.Context, clusterKey client.ObjectKey) (*rayv1.RayCluster, error) {
@@ -1713,6 +1739,99 @@ func (r *RayServiceReconciler) isHeadPodRunningAndReady(ctx context.Context, ins
17131739
return utils.IsRunningAndReady(headPod), nil
17141740
}
17151741

1742+
// getInitializingTimeout parses the initializing timeout annotation from RayService.
1743+
// Returns (timeout, true) if valid. Accepts Go duration format (e.g., "5m", "1h") or integer seconds.
1744+
// The annotation is assumed to be already validated by ValidateRayServiceMetadata.
1745+
// If the annotation is absent, returns (0, false).
1746+
func getInitializingTimeout(rs *rayv1.RayService) (time.Duration, bool) {
1747+
if rs.Annotations == nil {
1748+
return 0, false
1749+
}
1750+
1751+
timeoutStr, exists := rs.Annotations[utils.RayServiceInitializingTimeoutAnnotation]
1752+
if !exists || timeoutStr == "" {
1753+
return 0, false
1754+
}
1755+
1756+
// Try parsing as Go duration first (e.g., "30m", "1h")
1757+
// Validation already ensures this is valid, so we can ignore errors
1758+
if timeout, err := time.ParseDuration(timeoutStr); err == nil {
1759+
return timeout, true
1760+
}
1761+
1762+
// Try parsing as integer seconds
1763+
// Validation already ensures this is valid if ParseDuration failed
1764+
if seconds, err := strconv.Atoi(timeoutStr); err == nil {
1765+
return time.Duration(seconds) * time.Second, true
1766+
}
1767+
1768+
// This should never happen since validation ensures correctness,
1769+
// but we handle it gracefully by returning false
1770+
return 0, false
1771+
}
1772+
1773+
// isInitializingTimeout returns true if RayServiceReady is False with Reason=InitializingTimeout.
1774+
// Once a RayService has timed out, it remains in a terminal failure state regardless of generation changes.
1775+
func isInitializingTimeout(rs *rayv1.RayService) bool {
1776+
readyCond := meta.FindStatusCondition(rs.Status.Conditions, string(rayv1.RayServiceReady))
1777+
if readyCond == nil {
1778+
return false
1779+
}
1780+
1781+
return readyCond.Status == metav1.ConditionFalse &&
1782+
readyCond.Reason == string(rayv1.RayServiceInitializingTimeout)
1783+
}
1784+
1785+
// markFailedOnInitializingTimeout checks if the RayService has been initializing for too long.
1786+
// If timeout is configured and exceeded, it marks the service as failed and triggers cleanup.
1787+
func markFailedOnInitializingTimeout(ctx context.Context, r *RayServiceReconciler, rs *rayv1.RayService) {
1788+
logger := ctrl.LoggerFrom(ctx)
1789+
1790+
// Skip if no timeout is configured
1791+
timeout, ok := getInitializingTimeout(rs)
1792+
if !ok {
1793+
return
1794+
}
1795+
1796+
// Check if currently in Initializing state
1797+
readyCond := meta.FindStatusCondition(rs.Status.Conditions, string(rayv1.RayServiceReady))
1798+
if readyCond == nil {
1799+
return
1800+
}
1801+
1802+
if readyCond.Status != metav1.ConditionFalse || readyCond.Reason != string(rayv1.RayServiceInitializing) {
1803+
// Not in Initializing state
1804+
return
1805+
}
1806+
1807+
// Check if timeout has been exceeded
1808+
timeInInitializing := time.Since(readyCond.LastTransitionTime.Time)
1809+
if timeInInitializing < timeout {
1810+
// Still within timeout
1811+
return
1812+
}
1813+
1814+
// Timeout exceeded - mark as failed
1815+
logger.Info("RayService initializing timeout exceeded",
1816+
"timeout", timeout,
1817+
"timeInInitializing", timeInInitializing,
1818+
"generation", rs.Generation)
1819+
1820+
// Clear cluster names to trigger cleanup
1821+
rs.Status.ActiveServiceStatus.RayClusterName = ""
1822+
rs.Status.PendingServiceStatus.RayClusterName = ""
1823+
1824+
// Set condition to Failed with InitializingTimeout reason
1825+
message := fmt.Sprintf("RayService failed to become ready within the configured timeout of %s. Time spent initializing: %s",
1826+
timeout, timeInInitializing)
1827+
setCondition(rs, rayv1.RayServiceReady, metav1.ConditionFalse, rayv1.RayServiceInitializingTimeout, message)
1828+
1829+
// Emit warning event
1830+
r.Recorder.Eventf(rs, corev1.EventTypeWarning, string(utils.RayServiceInitializingTimeout),
1831+
"RayService initializing timeout exceeded after %s (configured timeout: %s)",
1832+
timeInInitializing, timeout)
1833+
}
1834+
17161835
// reconcilePerClusterServeService reconciles a load-balancing serve service for a given RayCluster.
17171836
func (r *RayServiceReconciler) reconcilePerClusterServeService(ctx context.Context, rayServiceInstance *rayv1.RayService, rayClusterInstance *rayv1.RayCluster) error {
17181837
if rayClusterInstance == nil {

0 commit comments

Comments
 (0)