diff --git a/apiserver/test/e2e/cluster_server_autoscaler_e2e_test.go b/apiserver/test/e2e/cluster_server_autoscaler_e2e_test.go index a8205d586c9..c025f1ea279 100644 --- a/apiserver/test/e2e/cluster_server_autoscaler_e2e_test.go +++ b/apiserver/test/e2e/cluster_server_autoscaler_e2e_test.go @@ -91,7 +91,10 @@ func TestCreateClusterAutoscaler(t *testing.T) { } // Create cluster - actualCluster, actualRPCStatus, err := tCtx.GetRayAPIServerClient().CreateCluster(&clusterReq) + actualCluster, actualRPCStatus, _ := tCtx.GetRayAPIServerClient().CreateCluster(&clusterReq) + metricsResult := make([]string, 50) + stopCh, err := LogPodMetrics(actualCluster.Namespace, 5*time.Second, &metricsResult) + defer cleanupAndProcessMetrics(t, &stopCh, &metricsResult) require.NoError(t, err, "No error expected") require.Nil(t, actualRPCStatus, "No RPC status expected") require.NotNil(t, actualCluster, "A cluster is expected") diff --git a/apiserver/test/e2e/cluster_server_e2e_test.go b/apiserver/test/e2e/cluster_server_e2e_test.go index b6682b6bf3f..f168f1c6de7 100644 --- a/apiserver/test/e2e/cluster_server_e2e_test.go +++ b/apiserver/test/e2e/cluster_server_e2e_test.go @@ -4,6 +4,7 @@ import ( "fmt" "net/http" "testing" + "time" "github.com/stretchr/testify/require" @@ -427,8 +428,14 @@ func TestCreateClusterEndpoint(t *testing.T) { // Execute tests sequentially for _, tc := range tests { tc := tc // capture range variable + require.NoError(t, err, "No error expected") t.Run(tc.Name, func(t *testing.T) { actualCluster, actualRPCStatus, err := tCtx.GetRayAPIServerClient().CreateCluster(tc.Input) + if tc.Input.Namespace != "" { + metricsResult := make([]string, 50) + stopCh, _ := LogPodMetrics(tc.Input.Namespace, 5*time.Second, &metricsResult) + defer cleanupAndProcessMetrics(t, &stopCh, &metricsResult) + } if tc.ExpectedError == nil { require.NoError(t, err, "No error expected") require.Nil(t, actualRPCStatus, "No RPC status expected") diff --git a/apiserver/test/e2e/job_server_e2e_test.go b/apiserver/test/e2e/job_server_e2e_test.go index e619b9e0e41..4145e08091d 100644 --- a/apiserver/test/e2e/job_server_e2e_test.go +++ b/apiserver/test/e2e/job_server_e2e_test.go @@ -218,7 +218,11 @@ func TestCreateJobWithDisposableClusters(t *testing.T) { for _, tc := range tests { tc := tc // capture range variable t.Run(tc.Name, func(t *testing.T) { - actualJob, actualRPCStatus, err := tCtx.GetRayAPIServerClient().CreateRayJob(tc.Input) + actualJob, actualRPCStatus, _ := tCtx.GetRayAPIServerClient().CreateRayJob(tc.Input) + metricsResult := make([]string, 50) + stopCh, err := LogPodMetrics(tc.Input.Namespace, 5*time.Second, &metricsResult) + defer cleanupAndProcessMetrics(t, &stopCh, &metricsResult) + require.NoError(t, err) if tc.ExpectedError == nil { require.NoError(t, err, "No error expected") require.Nil(t, actualRPCStatus, "No RPC status expected") @@ -227,7 +231,7 @@ func TestCreateJobWithDisposableClusters(t *testing.T) { tCtx.DeleteRayJobByName(t, actualJob.Name) } else { require.EqualError(t, err, tc.ExpectedError.Error(), "Matching error expected") - require.NotNil(t, actualRPCStatus, "A not nill RPC status is required") + require.NotNil(t, actualRPCStatus, "A not nil RPC status is required") } }) } @@ -277,6 +281,7 @@ func TestDeleteJob(t *testing.T) { for _, tc := range tests { tc := tc // capture range variable t.Run(tc.Name, func(t *testing.T) { + require.NoError(t, err) actualRPCStatus, err := tCtx.GetRayAPIServerClient().DeleteRayJob(tc.Input) if tc.ExpectedError == nil { require.NoError(t, err, "No error expected") diff --git a/apiserver/test/e2e/utils.go b/apiserver/test/e2e/utils.go index 716c3037f6e..91904dc5161 100644 --- a/apiserver/test/e2e/utils.go +++ b/apiserver/test/e2e/utils.go @@ -7,6 +7,8 @@ import ( "encoding/json" "io" "net/http" + "os/exec" + "strconv" "strings" "testing" "time" @@ -103,3 +105,66 @@ func waitForDeletedCluster(t *testing.T, tCtx *End2EndTestingContext, clusterNam }) require.NoErrorf(t, err, "No error expected when deleting ray cluster: '%s', err %v", clusterName, err) } + +// LogPodMetrics captures pod memory usage and logs it to a file +func LogPodMetrics(observedNamespace string, interval time.Duration, result *[]string) (chan<- struct{}, error) { + stopCh := make(chan struct{}) + go func() { + for { + select { + case <-stopCh: + return + default: + cmd := exec.Command("kubectl", "top", "pod", "-n", observedNamespace) + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + + if err := cmd.Run(); err != nil { + continue + } + + lines := strings.Split(stdout.String(), "\n") + if len(lines) > 1 { + *result = append(*result, strings.Join(lines[1:], "\n")) + } + + time.Sleep(interval) + } + } + }() + return stopCh, nil +} + +func cleanupAndProcessMetrics(t *testing.T, stopCh *chan<- struct{}, result *[]string) { + close(*stopCh) + resultString := "" + peakCPU := float64(0) + peakMemory := float64(0) + for _, s := range *result { + lines := strings.Split(s, "\n") + + for _, line := range lines { + if line == "" { + continue + } + fields := strings.Fields(line) + if len(fields) >= 3 { + cpu := strings.TrimSuffix(fields[1], "m") + mem := strings.TrimSuffix(fields[2], "Mi") + cpuVal, _ := strconv.ParseFloat(cpu, 64) + memVal, _ := strconv.ParseFloat(mem, 64) + if cpuVal > peakCPU { + peakCPU = cpuVal + } + if memVal > peakMemory { + peakMemory = memVal + } + } + } + resultString += s + + } + t.Logf("Metrics result:\n%s\n", resultString) + t.Logf("\nPeak CPU usage: %.1fm\nPeak Memory usage: %.1fMi", peakCPU, peakMemory) +}