From 152fa5aa2fa31ba52a8163d925b152690b830280 Mon Sep 17 00:00:00 2001 From: hippo-an Date: Wed, 20 Nov 2024 14:49:01 +0900 Subject: [PATCH 1/2] avg sampling for load result data and sort the result by timestamp ascend order --- .../load/load_generator_install_service.go | 2 +- .../performance_evaluation_result_service.go | 100 ++++++++++++++++-- ...formance_evaluation_result_service_test.go | 79 ++++++++++++++ internal/utils/ioes.go | 20 +--- 4 files changed, 178 insertions(+), 23 deletions(-) create mode 100644 internal/core/load/performance_evaluation_result_service_test.go diff --git a/internal/core/load/load_generator_install_service.go b/internal/core/load/load_generator_install_service.go index 75ba62c..736d98d 100644 --- a/internal/core/load/load_generator_install_service.go +++ b/internal/core/load/load_generator_install_service.go @@ -600,7 +600,7 @@ func (l *LoadService) GetAllLoadGeneratorInstallInfo(param GetAllLoadGeneratorIn result.LoadGeneratorInstallInfoResults = infos result.TotalRows = totalRows - log.Info().Msgf("Fetched %d load generator install info results. length: %d", len(infos)) + log.Info().Msgf("Fetched %d load generator install info results.", len(infos)) return result, nil } diff --git a/internal/core/load/performance_evaluation_result_service.go b/internal/core/load/performance_evaluation_result_service.go index a5258b7..75c148c 100644 --- a/internal/core/load/performance_evaluation_result_service.go +++ b/internal/core/load/performance_evaluation_result_service.go @@ -81,10 +81,10 @@ func (l *LoadService) GetLoadTestResult(param GetLoadTestResultParam) (interface return nil, err } - var resultSummaries []ResultSummary + var resultSummaries []*ResultSummary for label, results := range resultMap { - resultSummaries = append(resultSummaries, ResultSummary{ + resultSummaries = append(resultSummaries, &ResultSummary{ Label: label, Results: results, }) @@ -159,10 +159,10 @@ func (l *LoadService) GetLastLoadTestResult(param GetLastLoadTestResultParam) (i return nil, err } - var resultSummaries []ResultSummary + var resultSummaries []*ResultSummary for label, results := range resultMap { - resultSummaries = append(resultSummaries, ResultSummary{ + resultSummaries = append(resultSummaries, &ResultSummary{ Label: label, Results: results, }) @@ -360,6 +360,71 @@ func appendResultRawData(filePath string) (map[string][]*ResultRawData, error) { return resultMap, nil } +func SampleDataByTimeInterval(data []*ResultRawData, intervalMs int) []*ResultRawData { + if len(data) == 0 { + return nil + } + + sort.Slice(data, func(i, j int) bool { + return data[i].Timestamp.Before(data[j].Timestamp) + }) + + var sampledData []*ResultRawData + startTime := data[0].Timestamp + var currentInterval []*ResultRawData + + for _, record := range data { + elapsedTime := record.Timestamp.Sub(startTime).Milliseconds() + if elapsedTime < int64(intervalMs) { + currentInterval = append(currentInterval, record) + } else { + if len(currentInterval) > 0 { + sampledData = append(sampledData, calculateRepresentative(currentInterval)) + } + startTime = record.Timestamp + currentInterval = []*ResultRawData{record} + } + } + + if len(currentInterval) > 0 { + sampledData = append(sampledData, calculateRepresentative(currentInterval)) + } + + return sampledData +} + +func calculateRepresentative(data []*ResultRawData) *ResultRawData { + if len(data) == 0 { + return nil + } + + var totalElapsed int + for _, record := range data { + totalElapsed += record.Elapsed + } + avgElapsed := totalElapsed / len(data) + + // Find the record closest to the average elapsed time + var closestRecord *ResultRawData + minDiff := int(^uint(0) >> 1) // Max int value + for _, record := range data { + diff := abs(record.Elapsed - avgElapsed) + if diff < minDiff { + minDiff = diff + closestRecord = record + } + } + + return closestRecord +} + +func abs(a int) int { + if a < 0 { + return -a + } + return a +} + func appendMetricsRawData(mrds map[string][]*MetricsRawData, filePath string) (map[string][]*MetricsRawData, error) { csvRows, err := utils.ReadCSV(filePath) if err != nil || csvRows == nil { @@ -420,15 +485,29 @@ func appendMetricsRawData(mrds map[string][]*MetricsRawData, filePath string) (m } } + for _, vals := range mrds { + if len(vals) > 0 { + sort.Slice(vals, func(i, j int) bool { + return vals[i].Timestamp.Before(vals[j].Timestamp) + }) + } + } + return mrds, nil } -func aggregate(resultRawDatas []ResultSummary) []*LoadTestStatistics { +func aggregate(resultRawDatas []*ResultSummary) []*LoadTestStatistics { var statistics []*LoadTestStatistics for i := range resultRawDatas { record := resultRawDatas[i] + + if record == nil { + log.Warn().Msgf("record must not be nil; %d", i) + continue + } + var requestCount, totalElapsed, totalBytes, totalSentBytes, errorCount int var elapsedList []int if len(record.Results) < 1 { @@ -500,7 +579,7 @@ func aggregate(resultRawDatas []ResultSummary) []*LoadTestStatistics { return statistics } -func resultFormat(format constant.ResultFormat, resultSummaries []ResultSummary) (any, error) { +func resultFormat(format constant.ResultFormat, resultSummaries []*ResultSummary) (any, error) { if resultSummaries == nil { return nil, nil } @@ -508,6 +587,15 @@ func resultFormat(format constant.ResultFormat, resultSummaries []ResultSummary) switch format { case constant.Aggregate: return aggregate(resultSummaries), nil + case constant.Normal: + + for i := range resultSummaries { + rs := resultSummaries[i] + if len(rs.Results) > 0 { + rs.Results = SampleDataByTimeInterval(rs.Results, 100) + } + } + } return resultSummaries, nil diff --git a/internal/core/load/performance_evaluation_result_service_test.go b/internal/core/load/performance_evaluation_result_service_test.go new file mode 100644 index 0000000..c34605f --- /dev/null +++ b/internal/core/load/performance_evaluation_result_service_test.go @@ -0,0 +1,79 @@ +package load + +import ( + "bufio" + "fmt" + "os" + "testing" + "time" + + "github.com/rs/zerolog/log" + "github.com/stretchr/testify/assert" + "golang.org/x/exp/rand" +) + +func generateCSVData(numRecords int) (string, error) { + file, err := os.CreateTemp("/tmp", "large_data_*.csv") + if err != nil { + return "", err + } + defer file.Close() + + writer := bufio.NewWriter(file) + + _, err = writer.WriteString("timeStamp,elapsed,label,responseCode,responseMessage,threadName,dataType,success,failureMessage,bytes,sentBytes,grpThreads,allThreads,URL,Latency,IdleTime,Connect\n") + if err != nil { + return "", err + } + + // Generate random data and write to CSV + for i := 0; i < numRecords; i++ { + timestamp := time.Now().Add(time.Duration(i) * time.Millisecond).UnixMilli() + elapsed := rand.Intn(1000) // Random elapsed time + label := fmt.Sprintf("Label%d", rand.Intn(10)) // Random label + responseCode := rand.Intn(500) // Random response code + responseMessage := fmt.Sprintf("Message%d", rand.Intn(10)) // Random response message + threadName := fmt.Sprintf("Thread%d", rand.Intn(10)) // Random thread name + dataType := rand.Intn(10) // Random data type + success := rand.Intn(2) == 1 // Random success (true or false) + failureMessage := "" // Can be filled with some failure message if needed + bytes := rand.Intn(1000) // Random bytes + sentBytes := rand.Intn(1000) // Random sent bytes + grpThreads := rand.Intn(100) // Random group threads + allThreads := rand.Intn(200) // Random all threads + url := "https://example.com" // Example URL + latency := rand.Intn(500) // Random latency + idleTime := rand.Intn(200) // Random idle time + connection := rand.Intn(100) // Random connection + + _, err := writer.WriteString(fmt.Sprintf("%d,%d,%s,%d,%s,%s,%d,%t,%s,%d,%d,%d,%d,%s,%d,%d,%d\n", + timestamp, elapsed, label, responseCode, responseMessage, threadName, dataType, success, failureMessage, + bytes, sentBytes, grpThreads, allThreads, url, latency, idleTime, connection)) + if err != nil { + return "", err + } + } + + err = writer.Flush() + if err != nil { + return "", err + } + + return file.Name(), nil +} + +// {"level":"info","time":"2024-11-20T14:46:21+09:00","message":"Time taken to process CSV: 10.501558875s"} +func TestAppendResultRawData(t *testing.T) { + numRecords := 100_000_00 // 10 million + + filePath, err := generateCSVData(numRecords) + assert.NoError(t, err) + defer os.Remove(filePath) + start := time.Now() + + _, err = appendResultRawData(filePath) + assert.NoError(t, err) + + duration := time.Since(start) + log.Info().Msgf("Time taken to process CSV: %v", duration) +} diff --git a/internal/utils/ioes.go b/internal/utils/ioes.go index 73ae165..3717633 100644 --- a/internal/utils/ioes.go +++ b/internal/utils/ioes.go @@ -3,7 +3,6 @@ package utils import ( "encoding/csv" "fmt" - "io" "os" "strings" @@ -59,21 +58,10 @@ func ReadCSV(filename string) (*[][]string, error) { defer file.Close() reader := csv.NewReader(file) - - var parsedCsv [][]string - for { - line, err := reader.Read() - - if err != nil { - if err == io.EOF { - return &parsedCsv, nil - } - - log.Error().Msgf("failed to read csv file from path %s; %v", filename, err) - break - } - - parsedCsv = append(parsedCsv, line) + parsedCsv, err := reader.ReadAll() + if err != nil { + log.Printf("Failed to read CSV file from path %s; %v", filename, err) + return nil, err } return &parsedCsv, nil From 4bcfd973f48f3e28503ae20a71021ff6524dad8c Mon Sep 17 00:00:00 2001 From: hippo-an Date: Thu, 21 Nov 2024 11:22:07 +0900 Subject: [PATCH 2/2] update related system image version and readme file --- README.md | 10 +++++----- docker-compose.yaml | 6 +++--- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index f87fb70..8be1bd6 100644 --- a/README.md +++ b/README.md @@ -59,8 +59,8 @@ These functionalities are integrated with other subsystems, namely `CB-Tumblebug - Container: Docker 25.0.0 ### Subsystem Dependency -- CB-Spider : v0.9.0 <- **cost explorer anycall handler not yet implemented version.** -- CB-Tumblebug : v0.9.7 +- CB-Spider : v0.10.0 +- CB-Tumblebug : v0.10.0 --- @@ -108,13 +108,13 @@ Follow the guide for initializing CB-Tumblebug to configure multi-cloud informat ### Pre-Configuration for Performance Evaluation ⭐⭐ To correctly use the performance evaluation features provided by CM-ANT, the following steps are required: -- Register appropriate permissions for VM provisioning with the registered credentials. [TBD] +- Register appropriate permissions for VM provisioning with the registered credentials. ### Pre-Configuration for Price and Cost Features ⭐⭐ To correctly use the price and cost features provided by CM-ANT, the following steps are required: - Enable AWS Cost Explorer and set up daily granularity resource-level data. -- Register appropriate permissions for price and cost retrieval with the registered credentials. [TBD] +- Register appropriate permissions for price and cost retrieval with the registered credentials. @@ -137,7 +137,7 @@ To correctly use the price and cost features provided by CM-ANT, the following ## How to Use 🔍 #### 👉 [CM-ANT Swagger API Doc](https://cloud-barista.github.io/api/?url=https://raw.githubusercontent.com/cloud-barista/cm-ant/main/api/swagger.yaml) -[TBD] +#### 👉 [Simple and Sample guide](https://github.com/cloud-barista/cm-ant/wiki/CM%E2%80%90ANT-Simple--&-Sample-API-Usage-guide) diff --git a/docker-compose.yaml b/docker-compose.yaml index beae8cb..0add4d0 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -44,14 +44,14 @@ services: - cm-ant-net healthcheck: test: [ "CMD", "pg_isready", "-U", "cm-ant-user", "-d", "cm-ant-db" ] - interval: 30s + interval: 1m timeout: 5s retries: 5 start_period: 10s restart: unless-stopped cb-tumblebug: - image: cloudbaristaorg/cb-tumblebug:0.9.21 + image: cloudbaristaorg/cb-tumblebug:0.10.0 container_name: cb-tumblebug platform: linux/amd64 ports: @@ -148,7 +148,7 @@ services: restart: unless-stopped cb-spider: - image: cloudbaristaorg/cb-spider:0.9.8 + image: cloudbaristaorg/cb-spider:0.10.0 container_name: cb-spider platform: linux/amd64 networks: