Skip to content

Commit a361dc3

Browse files
authored
[Apiserver] Use Eventually from Gomega instead of wait from apimachinery (#3433)
1 parent 5265ee0 commit a361dc3

9 files changed

+137
-194
lines changed

apiserver/go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ require (
2727
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0
2828
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
2929
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0
30+
github.com/onsi/gomega v1.36.2
3031
github.com/rs/zerolog v1.34.0
3132
google.golang.org/genproto/googleapis/rpc v0.0.0-20240826202546-f6391c0de4c7
3233
k8s.io/utils v0.0.0-20241210054802-24370beab758

apiserver/test/e2e/cluster_server_autoscaler_e2e_test.go

+17-17
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,21 @@
11
package e2e
22

33
import (
4-
"context"
54
"testing"
6-
"time"
7-
8-
"github.com/stretchr/testify/require"
9-
"k8s.io/apimachinery/pkg/util/wait"
105

116
api "github.com/ray-project/kuberay/proto/go_client"
127

138
rayv1api "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
9+
10+
"github.com/onsi/gomega"
11+
"github.com/stretchr/testify/require"
1412
)
1513

1614
// TestCreateClusterAutoscalerEndpoint sequentially iterates over the create cluster endpoint
1715
// with valid and invalid requests
1816
func TestCreateClusterAutoscaler(t *testing.T) {
17+
g := gomega.NewWithT(t)
18+
1919
tCtx, err := NewEnd2EndTestingContext(t)
2020
require.NoError(t, err, "No error expected when creating testing context")
2121

@@ -120,18 +120,18 @@ func TestCreateClusterAutoscaler(t *testing.T) {
120120
require.NoError(t, err, "No error expected")
121121
require.Nil(t, actualRPCStatus, "No RPC status expected")
122122
require.NotNil(t, actualJob, "A job is expected")
123-
waitForRayJob(t, tCtx, createActorRequest.Job.Name, []rayv1api.JobStatus{rayv1api.JobStatusSucceeded})
123+
waitForRayJobInExpectedStatuses(t, tCtx, createActorRequest.Job.Name, []rayv1api.JobStatus{rayv1api.JobStatusSucceeded})
124124

125125
// worker pod should be created as part of job execution
126-
err = wait.PollUntilContextTimeout(tCtx.ctx, 500*time.Millisecond, 3*time.Minute, false, func(_ context.Context) (done bool, err error) {
126+
g.Eventually(func() int32 {
127127
rayCluster, err := tCtx.GetRayClusterByName(actualCluster.Name)
128128
if err != nil {
129-
return true, err
129+
t.Logf("Error getting ray cluster: %v", err)
130+
return -1 // Return -1 to indicate error condition
130131
}
131132
t.Logf("Found ray cluster with %d available worker replicas", rayCluster.Status.AvailableWorkerReplicas)
132-
return rayCluster.Status.AvailableWorkerReplicas == 1, nil
133-
})
134-
require.NoError(t, err, "No error expected")
133+
return rayCluster.Status.AvailableWorkerReplicas
134+
}, TestTimeoutMedium, TestPollingInterval).Should(gomega.Equal(int32(1)))
135135
// Delete actor
136136
deleteActorRequest := api.CreateRayJobRequest{
137137
Namespace: tCtx.GetNamespaceName(),
@@ -149,15 +149,15 @@ func TestCreateClusterAutoscaler(t *testing.T) {
149149
require.NoError(t, err, "No error expected")
150150
require.Nil(t, actualRPCStatus, "No RPC status expected")
151151
require.NotNil(t, actualJob, "A job is expected")
152-
waitForRayJob(t, tCtx, createActorRequest.Job.Name, []rayv1api.JobStatus{rayv1api.JobStatusSucceeded})
153-
154-
err = wait.PollUntilContextTimeout(tCtx.ctx, 500*time.Millisecond, 3*time.Minute, false, func(_ context.Context) (done bool, err error) {
152+
waitForRayJobInExpectedStatuses(t, tCtx, createActorRequest.Job.Name, []rayv1api.JobStatus{rayv1api.JobStatusSucceeded})
153+
g.Eventually(func() int32 {
155154
rayCluster, err := tCtx.GetRayClusterByName(actualCluster.Name)
156155
if err != nil {
157-
return true, err
156+
t.Logf("Error getting ray cluster: %v", err)
157+
return -1 // Return -1 to indicate error condition
158158
}
159159
t.Logf("Found ray cluster with %d available worker replicas", rayCluster.Status.AvailableWorkerReplicas)
160-
return rayCluster.Status.AvailableWorkerReplicas == 0, nil
161-
})
160+
return rayCluster.Status.AvailableWorkerReplicas
161+
}, TestTimeoutMedium, TestPollingInterval).Should(gomega.Equal(int32(0)))
162162
require.NoError(t, err, "No error expected")
163163
}

apiserver/test/e2e/cluster_server_e2e_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,12 @@ import (
55
"net/http"
66
"testing"
77

8-
"github.com/stretchr/testify/require"
9-
108
kuberayHTTP "github.com/ray-project/kuberay/apiserver/pkg/http"
119
api "github.com/ray-project/kuberay/proto/go_client"
1210

1311
rayv1api "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
12+
13+
"github.com/stretchr/testify/require"
1414
)
1515

1616
// TestCreateClusterEndpoint sequentially iterates over the create cluster endpoint
@@ -503,7 +503,7 @@ func TestDeleteCluster(t *testing.T) {
503503
if tc.ExpectedError == nil {
504504
require.NoError(t, err, "No error expected")
505505
require.Nil(t, actualRPCStatus, "No RPC status expected")
506-
waitForDeletedCluster(t, tCtx, tc.Input.Name)
506+
waitForClusterToDisappear(t, tCtx, tc.Input.Name)
507507
} else {
508508
require.EqualError(t, err, tc.ExpectedError.Error(), "Matching error expected")
509509
require.NotNil(t, actualRPCStatus, "A not nill RPC status is required")

apiserver/test/e2e/config_server_e2e_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@ import (
55
"reflect"
66
"testing"
77

8-
"github.com/stretchr/testify/require"
9-
108
kuberayHTTP "github.com/ray-project/kuberay/apiserver/pkg/http"
119
api "github.com/ray-project/kuberay/proto/go_client"
10+
11+
"github.com/stretchr/testify/require"
1212
)
1313

1414
// TestCreateTemplate sequentially iterates over the create compute endpoint

apiserver/test/e2e/job_server_e2e_test.go

+6-43
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,16 @@
11
package e2e
22

33
import (
4-
"context"
54
"fmt"
65
"net/http"
7-
"slices"
86
"testing"
9-
"time"
10-
11-
"github.com/stretchr/testify/assert"
12-
"github.com/stretchr/testify/require"
13-
"k8s.io/apimachinery/pkg/util/wait"
147

158
kuberayHTTP "github.com/ray-project/kuberay/apiserver/pkg/http"
169
api "github.com/ray-project/kuberay/proto/go_client"
1710

1811
rayv1api "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
12+
13+
"github.com/stretchr/testify/require"
1914
)
2015

2116
func TestCreateJobWithDisposableClusters(t *testing.T) {
@@ -94,7 +89,6 @@ func TestCreateJobWithDisposableClusters(t *testing.T) {
9489
ShutdownAfterJobFinishes: true,
9590
ClusterSpec: clusterSpec,
9691
TtlSecondsAfterFinished: 60,
97-
ActiveDeadlineSeconds: 120,
9892
},
9993
Namespace: tCtx.GetNamespaceName(),
10094
},
@@ -113,7 +107,6 @@ func TestCreateJobWithDisposableClusters(t *testing.T) {
113107
ShutdownAfterJobFinishes: true,
114108
ClusterSpec: clusterSpec,
115109
TtlSecondsAfterFinished: 60,
116-
ActiveDeadlineSeconds: 120,
117110
},
118111
Namespace: tCtx.GetNamespaceName(),
119112
},
@@ -223,7 +216,7 @@ func TestCreateJobWithDisposableClusters(t *testing.T) {
223216
require.NoError(t, err, "No error expected")
224217
require.Nil(t, actualRPCStatus, "No RPC status expected")
225218
require.NotNil(t, actualJob, "A job is expected")
226-
waitForRayJob(t, tCtx, tc.Input.Job.Name, []rayv1api.JobStatus{tc.ExpectedJobStatus})
219+
waitForRayJobInExpectedStatuses(t, tCtx, tc.Input.Job.Name, []rayv1api.JobStatus{tc.ExpectedJobStatus})
227220
tCtx.DeleteRayJobByName(t, actualJob.Name)
228221
} else {
229222
require.EqualError(t, err, tc.ExpectedError.Error(), "Matching error expected")
@@ -281,7 +274,7 @@ func TestDeleteJob(t *testing.T) {
281274
if tc.ExpectedError == nil {
282275
require.NoError(t, err, "No error expected")
283276
require.Nil(t, actualRPCStatus, "No RPC status expected")
284-
waitForDeletedRayJob(t, tCtx, testJobRequest.Job.Name)
277+
waitForRayJobToDisappear(t, tCtx, testJobRequest.Job.Name)
285278
} else {
286279
require.EqualError(t, err, tc.ExpectedError.Error(), "Matching error expected")
287280
require.NotNil(t, actualRPCStatus, "A not nill RPC status is required")
@@ -650,7 +643,7 @@ func TestCreateJobWithClusterSelector(t *testing.T) {
650643
require.NoError(t, err, "No error expected")
651644
require.Nil(t, actualRPCStatus, "No RPC status expected")
652645
require.NotNil(t, actualJob, "A job is expected")
653-
waitForRayJob(t, tCtx, tc.Input.Job.Name, []rayv1api.JobStatus{tc.ExpectedJobStatus})
646+
waitForRayJobInExpectedStatuses(t, tCtx, tc.Input.Job.Name, []rayv1api.JobStatus{tc.ExpectedJobStatus})
654647
tCtx.DeleteRayJobByName(t, actualJob.Name)
655648
} else {
656649
require.EqualError(t, err, tc.ExpectedError.Error(), "Matching error expected")
@@ -728,36 +721,6 @@ func createTestJob(t *testing.T, tCtx *End2EndTestingContext, expectedJobStatues
728721
require.NoError(t, err, "No error expected")
729722
require.Nil(t, actualRPCStatus, "No RPC status expected")
730723
require.NotNil(t, actualJob, "A job is expected")
731-
waitForRayJob(t, tCtx, testJobRequest.Job.Name, expectedJobStatues)
724+
waitForRayJobInExpectedStatuses(t, tCtx, testJobRequest.Job.Name, expectedJobStatues)
732725
return testJobRequest
733726
}
734-
735-
func waitForRayJob(t *testing.T, tCtx *End2EndTestingContext, rayJobName string, expectedJobStatuses []rayv1api.JobStatus) {
736-
// `expectedJobStatuses` is a slice of job statuses that we expect the job to be in
737-
// wait for the job to be in any of the `expectedJobStatuses` state for 3 minutes
738-
// if is not in that state, return an error
739-
err := wait.PollUntilContextTimeout(tCtx.ctx, 500*time.Millisecond, 3*time.Minute, false, func(_ context.Context) (done bool, err error) {
740-
rayJob, err := tCtx.GetRayJobByName(rayJobName)
741-
if err != nil {
742-
return true, err
743-
}
744-
t.Logf("Found ray job with state '%s' for ray job '%s'", rayJob.Status.JobStatus, rayJobName)
745-
return slices.Contains(expectedJobStatuses, rayJob.Status.JobStatus), nil
746-
})
747-
require.NoErrorf(t, err, "No error expected when getting status for ray job: '%s', err %v", rayJobName, err)
748-
}
749-
750-
func waitForDeletedRayJob(t *testing.T, tCtx *End2EndTestingContext, jobName string) {
751-
// wait for the job to be deleted
752-
// if is not in that state, return an error
753-
err := wait.PollUntilContextTimeout(tCtx.ctx, 500*time.Millisecond, 3*time.Minute, false, func(_ context.Context) (done bool, err error) {
754-
rayJob, err := tCtx.GetRayJobByName(jobName)
755-
if err != nil &&
756-
assert.EqualError(t, err, "rayjobs.ray.io \""+jobName+"\" not found") {
757-
return true, nil
758-
}
759-
t.Logf("Found status of '%s' for ray cluster '%s'", rayJob.Status.JobStatus, jobName)
760-
return false, err
761-
})
762-
require.NoErrorf(t, err, "No error expected when deleting ray job: '%s', err %v", jobName, err)
763-
}

apiserver/test/e2e/job_submission_e2e_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ package e2e
33
import (
44
"testing"
55

6-
"github.com/stretchr/testify/require"
7-
86
api "github.com/ray-project/kuberay/proto/go_client"
7+
8+
"github.com/stretchr/testify/require"
99
)
1010

1111
func TestCreateJobSubmission(t *testing.T) {

apiserver/test/e2e/service_server_e2e_test.go

+3-83
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,13 @@
11
package e2e
22

33
import (
4-
"context"
54
"net/http"
65
"testing"
7-
"time"
8-
9-
"github.com/stretchr/testify/assert"
10-
"github.com/stretchr/testify/require"
11-
"google.golang.org/protobuf/proto"
12-
"k8s.io/apimachinery/pkg/util/wait"
136

147
kuberayHTTP "github.com/ray-project/kuberay/apiserver/pkg/http"
158
api "github.com/ray-project/kuberay/proto/go_client"
9+
10+
"github.com/stretchr/testify/require"
1611
)
1712

1813
// TestServiceServerV2 sequentially iterates over the endpoints of the service endpoints using
@@ -199,7 +194,7 @@ func TestDeleteService(t *testing.T) {
199194
if tc.ExpectedError == nil {
200195
require.NoError(t, err, "No error expected")
201196
require.Nil(t, actualRPCStatus, "No RPC status expected")
202-
waitForDeletedService(t, tCtx, testServiceRequest.Service.Name)
197+
waitForServiceToDisappear(t, tCtx, testServiceRequest.Service.Name)
203198
} else {
204199
require.EqualError(t, err, tc.ExpectedError.Error(), "Matching error expected")
205200
require.NotNil(t, actualRPCStatus, "A not nill RPC status is required")
@@ -208,66 +203,6 @@ func TestDeleteService(t *testing.T) {
208203
}
209204
}
210205

211-
func TestUpdateRayService(t *testing.T) {
212-
tCtx, err := NewEnd2EndTestingContext(t)
213-
require.NoError(t, err)
214-
215-
tCtx.CreateComputeTemplate(t)
216-
t.Cleanup(func() {
217-
tCtx.DeleteComputeTemplate(t)
218-
})
219-
220-
testServiceRequest := createTestServiceV2(t, tCtx)
221-
serviceName := testServiceRequest.Service.Name
222-
namespace := testServiceRequest.Namespace
223-
224-
t.Cleanup(func() {
225-
tCtx.DeleteRayService(t, serviceName)
226-
})
227-
228-
// Verify the original value before update
229-
require.Equal(t, int32(1), testServiceRequest.Service.ClusterSpec.WorkerGroupSpec[0].Replicas)
230-
231-
// Clone the original ClusterSpec (proto.Clone avoids copying sync.Mutex)
232-
oldSpec := proto.Clone(testServiceRequest.Service.ClusterSpec).(*api.ClusterSpec)
233-
234-
// Only modify replicas
235-
oldSpec.WorkerGroupSpec[0].Replicas = 2 // Only field updated in this test
236-
237-
// Construct updated service from old spec
238-
updatedService := &api.RayService{
239-
Name: serviceName,
240-
Namespace: namespace,
241-
User: testServiceRequest.Service.User,
242-
Version: testServiceRequest.Service.Version,
243-
ServeConfig_V2: testServiceRequest.Service.ServeConfig_V2,
244-
ServiceUnhealthySecondThreshold: testServiceRequest.Service.ServiceUnhealthySecondThreshold,
245-
DeploymentUnhealthySecondThreshold: testServiceRequest.Service.DeploymentUnhealthySecondThreshold,
246-
ClusterSpec: oldSpec,
247-
}
248-
249-
updateReq := &api.UpdateRayServiceRequest{
250-
Service: updatedService,
251-
Namespace: namespace,
252-
Name: serviceName,
253-
}
254-
255-
// Perform the update
256-
respService, actualRPCStatus, err := tCtx.GetRayAPIServerClient().UpdateRayService(updateReq)
257-
require.NoError(t, err)
258-
require.Nil(t, actualRPCStatus)
259-
require.NotNil(t, respService)
260-
261-
// Confirm update via RayService CRD
262-
crdRayService, err := tCtx.GetRayServiceByName(serviceName)
263-
require.NoError(t, err)
264-
require.NotNil(t, crdRayService)
265-
266-
require.GreaterOrEqual(t, len(crdRayService.Spec.RayClusterSpec.WorkerGroupSpecs), 1)
267-
require.NotNil(t, crdRayService.Spec.RayClusterSpec.WorkerGroupSpecs[0].Replicas)
268-
require.Equal(t, int32(2), *crdRayService.Spec.RayClusterSpec.WorkerGroupSpecs[0].Replicas)
269-
}
270-
271206
func TestGetAllServices(t *testing.T) {
272207
tCtx, err := NewEnd2EndTestingContext(t)
273208
require.NoError(t, err, "No error expected when creating testing context")
@@ -537,18 +472,3 @@ func checkRayServiceCreatedSuccessfully(t *testing.T, tCtx *End2EndTestingContex
537472
require.NoError(t, err)
538473
require.NotNil(t, rayService)
539474
}
540-
541-
func waitForDeletedService(t *testing.T, tCtx *End2EndTestingContext, serviceName string) {
542-
// wait for the service to be deleted
543-
// if is not in that state, return an error
544-
err := wait.PollUntilContextTimeout(tCtx.ctx, 500*time.Millisecond, 3*time.Minute, false, func(_ context.Context) (done bool, err error) {
545-
rayService, err := tCtx.GetRayServiceByName(serviceName)
546-
if err != nil &&
547-
assert.EqualError(t, err, "rayservices.ray.io \""+serviceName+"\" not found") {
548-
return true, nil
549-
}
550-
t.Logf("Found status of '%s' for ray service '%s'", rayService.Status.ServiceStatus, serviceName)
551-
return false, err
552-
})
553-
require.NoErrorf(t, err, "No error expected when deleting ray service: '%s', err %v", serviceName, err)
554-
}

0 commit comments

Comments
 (0)