Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion apiserver/test/e2e/cluster_server_autoscaler_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,10 @@ func TestCreateClusterAutoscaler(t *testing.T) {
}

// Create cluster
actualCluster, actualRPCStatus, err := tCtx.GetRayAPIServerClient().CreateCluster(&clusterReq)
actualCluster, actualRPCStatus, _ := tCtx.GetRayAPIServerClient().CreateCluster(&clusterReq)
metricsResult := make([]string, 50)
stopCh, err := LogPodMetrics(actualCluster.Namespace, 5*time.Second, &metricsResult)
defer cleanupAndProcessMetrics(t, &stopCh, &metricsResult)
require.NoError(t, err, "No error expected")
require.Nil(t, actualRPCStatus, "No RPC status expected")
require.NotNil(t, actualCluster, "A cluster is expected")
Expand Down
7 changes: 7 additions & 0 deletions apiserver/test/e2e/cluster_server_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"net/http"
"testing"
"time"

"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -427,8 +428,14 @@ func TestCreateClusterEndpoint(t *testing.T) {
// Execute tests sequentially
for _, tc := range tests {
tc := tc // capture range variable
require.NoError(t, err, "No error expected")
t.Run(tc.Name, func(t *testing.T) {
actualCluster, actualRPCStatus, err := tCtx.GetRayAPIServerClient().CreateCluster(tc.Input)
if tc.Input.Namespace != "" {
metricsResult := make([]string, 50)
stopCh, _ := LogPodMetrics(tc.Input.Namespace, 5*time.Second, &metricsResult)
defer cleanupAndProcessMetrics(t, &stopCh, &metricsResult)
}
if tc.ExpectedError == nil {
require.NoError(t, err, "No error expected")
require.Nil(t, actualRPCStatus, "No RPC status expected")
Expand Down
9 changes: 7 additions & 2 deletions apiserver/test/e2e/job_server_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,11 @@ func TestCreateJobWithDisposableClusters(t *testing.T) {
for _, tc := range tests {
tc := tc // capture range variable
t.Run(tc.Name, func(t *testing.T) {
actualJob, actualRPCStatus, err := tCtx.GetRayAPIServerClient().CreateRayJob(tc.Input)
actualJob, actualRPCStatus, _ := tCtx.GetRayAPIServerClient().CreateRayJob(tc.Input)
metricsResult := make([]string, 50)
stopCh, err := LogPodMetrics(tc.Input.Namespace, 5*time.Second, &metricsResult)
defer cleanupAndProcessMetrics(t, &stopCh, &metricsResult)
require.NoError(t, err)
if tc.ExpectedError == nil {
require.NoError(t, err, "No error expected")
require.Nil(t, actualRPCStatus, "No RPC status expected")
Expand All @@ -227,7 +231,7 @@ func TestCreateJobWithDisposableClusters(t *testing.T) {
tCtx.DeleteRayJobByName(t, actualJob.Name)
} else {
require.EqualError(t, err, tc.ExpectedError.Error(), "Matching error expected")
require.NotNil(t, actualRPCStatus, "A not nill RPC status is required")
require.NotNil(t, actualRPCStatus, "A not nil RPC status is required")
}
})
}
Expand Down Expand Up @@ -277,6 +281,7 @@ func TestDeleteJob(t *testing.T) {
for _, tc := range tests {
tc := tc // capture range variable
t.Run(tc.Name, func(t *testing.T) {
require.NoError(t, err)
actualRPCStatus, err := tCtx.GetRayAPIServerClient().DeleteRayJob(tc.Input)
if tc.ExpectedError == nil {
require.NoError(t, err, "No error expected")
Expand Down
65 changes: 65 additions & 0 deletions apiserver/test/e2e/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"encoding/json"
"io"
"net/http"
"os/exec"
"strconv"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -103,3 +105,66 @@ func waitForDeletedCluster(t *testing.T, tCtx *End2EndTestingContext, clusterNam
})
require.NoErrorf(t, err, "No error expected when deleting ray cluster: '%s', err %v", clusterName, err)
}

// LogPodMetrics captures pod memory usage and logs it to a file
func LogPodMetrics(observedNamespace string, interval time.Duration, result *[]string) (chan<- struct{}, error) {
stopCh := make(chan struct{})
go func() {
for {
select {
case <-stopCh:
return
default:
cmd := exec.Command("kubectl", "top", "pod", "-n", observedNamespace)
var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = &stderr

if err := cmd.Run(); err != nil {
continue
}

lines := strings.Split(stdout.String(), "\n")
if len(lines) > 1 {
*result = append(*result, strings.Join(lines[1:], "\n"))
}

time.Sleep(interval)
}
}
}()
return stopCh, nil
}

func cleanupAndProcessMetrics(t *testing.T, stopCh *chan<- struct{}, result *[]string) {
close(*stopCh)
resultString := ""
peakCPU := float64(0)
peakMemory := float64(0)
for _, s := range *result {
lines := strings.Split(s, "\n")

for _, line := range lines {
if line == "" {
continue
}
fields := strings.Fields(line)
if len(fields) >= 3 {
cpu := strings.TrimSuffix(fields[1], "m")
mem := strings.TrimSuffix(fields[2], "Mi")
cpuVal, _ := strconv.ParseFloat(cpu, 64)
memVal, _ := strconv.ParseFloat(mem, 64)
if cpuVal > peakCPU {
peakCPU = cpuVal
}
if memVal > peakMemory {
peakMemory = memVal
}
}
}
resultString += s

}
t.Logf("Metrics result:\n%s\n", resultString)
t.Logf("\nPeak CPU usage: %.1fm\nPeak Memory usage: %.1fMi", peakCPU, peakMemory)
}
Loading