diff --git a/cmd/loadeljson.go b/cmd/loadeljson.go new file mode 100644 index 0000000..d73507a --- /dev/null +++ b/cmd/loadeljson.go @@ -0,0 +1,48 @@ +package cmd + +import ( + "fmt" + "os" + "pbench/cmd/loadeljson" + "pbench/utils" + "runtime" + "time" + + "github.com/spf13/cobra" +) + +// loadElJsonCmd represents the loadeljson command +var loadElJsonCmd = &cobra.Command{ + Use: `loadeljson [flags] [list of files or directories to process]`, + Short: "Load event listener JSON files into database and run recorders", + Long: `Load event listener JSON files (QueryCompletedEvent) into database and run recorders`, + DisableFlagsInUseLine: true, + Args: func(cmd *cobra.Command, args []string) error { + if len(args) < 1 { + return fmt.Errorf("requires at least 1 arg, only received %d", len(args)) + } + if loadeljson.Parallelism < 1 || loadeljson.Parallelism > runtime.NumCPU() { + return fmt.Errorf("invalid parallelism: %d, it should be >= 1 and <= %d", loadeljson.Parallelism, runtime.NumCPU()) + } + utils.ExpandHomeDirectory(&loadeljson.MySQLCfgPath) + utils.ExpandHomeDirectory(&loadeljson.InfluxCfgPath) + utils.ExpandHomeDirectory(&loadeljson.OutputPath) + return nil + }, + Run: loadeljson.Run, +} + +func init() { + RootCmd.AddCommand(loadElJsonCmd) + wd, _ := os.Getwd() + loadElJsonCmd.Flags().StringVarP(&loadeljson.RunName, "name", "n", fmt.Sprintf("load_el_%s", time.Now().Format(utils.DirectoryNameTimeFormat)), `Assign a name to this run. (default: "load_el_")`) + loadElJsonCmd.Flags().StringVarP(&loadeljson.Comment, "comment", "c", "", `Add a comment to this run (optional)`) + loadElJsonCmd.Flags().BoolVarP(&loadeljson.RecordRun, "record-run", "r", false, "Record all the loaded JSON as a run") + loadElJsonCmd.Flags().StringVarP(&loadeljson.OutputPath, "output-path", "o", wd, "Output directory path") + loadElJsonCmd.Flags().IntVarP(&loadeljson.Parallelism, "parallel", "P", runtime.NumCPU(), "Number of parallel threads to load json files") + loadElJsonCmd.Flags().StringVar(&loadeljson.InfluxCfgPath, "influx", "", "InfluxDB connection config for run recorder (optional)") + loadElJsonCmd.Flags().StringVar(&loadeljson.MySQLCfgPath, "mysql", "", "MySQL connection config for event listener and run recorder (optional)") + loadElJsonCmd.Flags().BoolVar(&loadeljson.IsNDJSON, "ndjson", false, "Process files as NDJSON (newline-delimited JSON) format") +} + +// Made with Bob diff --git a/cmd/loadeljson/db.go b/cmd/loadeljson/db.go new file mode 100644 index 0000000..b0123ae --- /dev/null +++ b/cmd/loadeljson/db.go @@ -0,0 +1,385 @@ +package loadeljson + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "pbench/log" +) + +// insertEventListenerData inserts data into multiple tables to mirror Java implementation in MySQLWriter.post() +func insertEventListenerData(ctx context.Context, db *sql.DB, qce *QueryCompletedEvent, queryId string) error { + // 1. Insert into presto_query_creation_info + if err := insertQueryCreationInfo(ctx, db, qce, queryId); err != nil { + return fmt.Errorf("failed to insert query creation info: %w", err) + } + + // 2. Insert into presto_query_plans + if err := insertQueryPlans(ctx, db, qce, queryId); err != nil { + return fmt.Errorf("failed to insert query plans: %w", err) + } + + // 3. Insert into presto_query_stage_stats + if err := insertQueryStageStats(ctx, db, qce, queryId); err != nil { + return fmt.Errorf("failed to insert query stage stats: %w", err) + } + + // 4. Insert into presto_query_operator_stats + if err := insertQueryOperatorStats(ctx, db, qce, queryId); err != nil { + return fmt.Errorf("failed to insert query operator stats: %w", err) + } + + // 5. Insert into presto_query_statistics + if err := insertQueryStatistics(ctx, db, qce, queryId); err != nil { + return fmt.Errorf("failed to insert query statistics: %w", err) + } + + return nil +} + +func insertQueryCreationInfo(ctx context.Context, db *sql.DB, qce *QueryCompletedEvent, queryId string) error { + query := `REPLACE INTO presto_query_creation_info( + query_id, query, create_time, schema_name, catalog_name, environment, + user, remote_client_address, source, user_agent, uri, + session_properties_json, server_version, client_info, resource_group_name, + principal, transaction_id, client_tags, resource_estimates, dt + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` + + var schema, catalog, remoteAddr, source, userAgent, clientInfo, principal, resourceGroupId string + if qce.Context.Schema != nil { + schema = *qce.Context.Schema + } + if qce.Context.Catalog != nil { + catalog = *qce.Context.Catalog + } + if qce.Context.RemoteClientAddress != nil { + remoteAddr = *qce.Context.RemoteClientAddress + } + if qce.Context.Source != nil { + source = *qce.Context.Source + } + if qce.Context.UserAgent != nil { + userAgent = *qce.Context.UserAgent + } + if qce.Context.ClientInfo != nil { + clientInfo = *qce.Context.ClientInfo + } + if qce.Context.Principal != nil { + principal = *qce.Context.Principal + } + if qce.Context.ResourceGroupId != nil { + resourceGroupId = qce.Context.ResourceGroupId.Value + } + + // Handle transaction_id + var transactionId string + if qce.Metadata.TransactionId != nil { + transactionId = *qce.Metadata.TransactionId + } + + // Marshal client_tags + clientTagsJson, err := json.Marshal(qce.Context.ClientTags) + if err != nil { + log.Error().Err(err).Str("query_id", queryId).Msg("failed to marshal client tags") + clientTagsJson = []byte("[]") + } + + // Marshal resource_estimates + resourceEstimatesJson, err := json.Marshal(qce.Context.ResourceEstimates) + if err != nil { + log.Error().Err(err).Str("query_id", queryId).Msg("failed to marshal resource estimates") + resourceEstimatesJson = []byte("{}") + } + + sessionPropsJson, err := json.Marshal(qce.Context.SessionProperties) + if err != nil { + log.Error().Err(err).Str("query_id", queryId).Msg("failed to marshal session properties") + sessionPropsJson = []byte("{}") + } + + dt := qce.CreateTime.Time.Format("2006-01-02 15:04:05") + + _, err = db.ExecContext(ctx, query, + queryId, qce.Metadata.Query, + qce.CreateTime.Time.Format("2006-01-02 15:04:05"), + schema, catalog, qce.Context.Environment, + qce.Context.User, remoteAddr, source, userAgent, qce.Metadata.Uri, + string(sessionPropsJson), qce.Context.ServerVersion, clientInfo, resourceGroupId, + principal, transactionId, string(clientTagsJson), string(resourceEstimatesJson), dt, + ) + + return err +} + +func insertQueryPlans(ctx context.Context, db *sql.DB, qce *QueryCompletedEvent, queryId string) error { + query := `REPLACE INTO presto_query_plans( + query_id, query, plan, json_plan, environment, dt + ) VALUES (?, ?, ?, ?, ?, ?)` + + var plan, jsonPlan string + if qce.Metadata.Plan != nil { + plan = *qce.Metadata.Plan + } + if qce.Metadata.JsonPlan != nil { + jsonPlan = *qce.Metadata.JsonPlan + } + + dt := qce.CreateTime.Time.Format("2006-01-02 15:04:05") + + _, err := db.ExecContext(ctx, query, + queryId, qce.Metadata.Query, plan, jsonPlan, qce.Context.Environment, dt, + ) + + return err +} + +func insertQueryStageStats(ctx context.Context, db *sql.DB, qce *QueryCompletedEvent, queryId string) error { + if len(qce.StageStatistics) == 0 { + return nil + } + + query := `REPLACE INTO presto_query_stage_stats( + query_id, stage_id, stage_execution_id, tasks, + total_scheduled_time_ms, total_cpu_time_ms, retried_cpu_time_ms, total_blocked_time_ms, + raw_input_data_size_bytes, processed_input_data_size_bytes, physical_written_data_size_bytes, + gc_statistics, cpu_distribution, memory_distribution, dt + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` + + dt := qce.CreateTime.Time.Format("2006-01-02 15:04:05") + + for _, stage := range qce.StageStatistics { + // These fields are not available in the event listener JSON, so we use empty JSON objects + gcStatsJson := []byte("{}") + cpuDistJson := []byte("{}") + memDistJson := []byte("{}") + + _, err := db.ExecContext(ctx, query, + queryId, stage.StageId, stage.StageExecutionId, stage.Tasks, + stage.TotalScheduledTime.Milliseconds(), + stage.TotalCpuTime.Milliseconds(), + stage.RetriedCpuTime.Milliseconds(), + stage.TotalBlockedTime.Milliseconds(), + stage.RawInputDataSize.Bytes, + stage.ProcessedInputDataSize.Bytes, + stage.PhysicalWrittenDataSize.Bytes, + string(gcStatsJson), string(cpuDistJson), string(memDistJson), dt, + ) + + if err != nil { + return fmt.Errorf("failed to insert stage %d: %w", stage.StageId, err) + } + } + + return nil +} + +func insertQueryOperatorStats(ctx context.Context, db *sql.DB, qce *QueryCompletedEvent, queryId string) error { + if len(qce.OperatorStatistics) == 0 { + return nil + } + + query := `REPLACE INTO presto_query_operator_stats( + query_id, stage_id, stage_execution_id, pipeline_id, operator_id, + plan_node_id, operator_type, total_drivers, + add_input_calls, add_input_wall_ms, add_input_cpu_ms, add_input_allocation_bytes, + raw_input_data_size_bytes, raw_input_positions, input_data_size_bytes, input_positions, + sum_squared_input_positions, get_output_calls, get_output_wall_ms, get_output_cpu_ms, + get_output_allocation_bytes, output_data_size_bytes, output_positions, + physical_written_data_size_bytes, blocked_wall_ms, finish_calls, finish_wall_ms, + finish_cpu_ms, finish_allocation_bytes, user_memory_reservation_bytes, + revocable_memory_reservation_bytes, system_memory_reservation_bytes, + peak_user_memory_reservation_bytes, peak_system_memory_reservation_bytes, + peak_total_memory_reservation_bytes, spilled_data_size_bytes, info, dt + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` + + dt := qce.CreateTime.Time.Format("2006-01-02 15:04:05") + + for _, op := range qce.OperatorStatistics { + var info string + if op.Info != nil { + info = *op.Info + } + + _, err := db.ExecContext(ctx, query, + queryId, op.StageId, op.StageExecutionId, op.PipelineId, op.OperatorId, + op.PlanNodeId, op.OperatorType, op.TotalDrivers, + op.AddInputCalls, op.AddInputWall.Milliseconds(), op.AddInputCpu.Milliseconds(), + op.AddInputAllocation.Bytes, op.RawInputDataSize.Bytes, op.RawInputPositions, + op.InputDataSize.Bytes, op.InputPositions, 0.0, // sumSquaredInputPositions not available + op.GetOutputCalls, op.GetOutputWall.Milliseconds(), op.GetOutputCpu.Milliseconds(), + op.GetOutputAllocation.Bytes, op.OutputDataSize.Bytes, op.OutputPositions, + op.PhysicalWrittenDataSize.Bytes, op.BlockedWall.Milliseconds(), op.FinishCalls, + op.FinishWall.Milliseconds(), op.FinishCpu.Milliseconds(), op.FinishAllocation.Bytes, + op.UserMemoryReservation.Bytes, op.RevocableMemoryReservation.Bytes, + op.SystemMemoryReservation.Bytes, op.PeakUserMemoryReservation.Bytes, + op.PeakSystemMemoryReservation.Bytes, op.PeakTotalMemoryReservation.Bytes, + op.SpilledDataSize.Bytes, info, dt, + ) + + if err != nil { + return fmt.Errorf("failed to insert operator %d: %w", op.OperatorId, err) + } + } + + return nil +} + +func insertQueryStatistics(ctx context.Context, db *sql.DB, qce *QueryCompletedEvent, queryId string) error { + // Insert into presto_query_statistics table + // This mirrors the Java implementation in PrestoQueryStatsDao.insertQueryStatistics + + query := `REPLACE INTO presto_query_statistics( + query_id, query, query_type, schema_name, catalog_name, environment, + user, remote_client_address, source, user_agent, uri, + session_properties_json, server_version, client_info, resource_group_name, + principal, transaction_id, client_tags, resource_estimates, + create_time, end_time, execution_start_time, query_state, + failure_message, failure_type, failures_json, failure_task, failure_host, + error_code, error_code_name, error_category, warnings_json, + splits, analysis_time_ms, queued_time_ms, query_wall_time_ms, + query_execution_time_ms, bytes_per_cpu_sec, bytes_per_sec, rows_per_cpu_sec, + total_bytes, total_rows, output_rows, output_bytes, + written_rows, written_bytes, cumulative_memory, peak_user_memory_bytes, + peak_total_memory_bytes, peak_task_total_memory, peak_task_user_memory, + written_intermediate_bytes, peak_node_total_memory, total_split_cpu_time_ms, + stage_count, cumulative_total_memory, dt + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` + + var queryType string + if qce.QueryType != nil { + queryType = *qce.QueryType + } + + var schema, catalog, remoteAddr, source, userAgent, clientInfo, principal, resourceGroupId string + if qce.Context.Schema != nil { + schema = *qce.Context.Schema + } + if qce.Context.Catalog != nil { + catalog = *qce.Context.Catalog + } + if qce.Context.RemoteClientAddress != nil { + remoteAddr = *qce.Context.RemoteClientAddress + } + if qce.Context.Source != nil { + source = *qce.Context.Source + } + if qce.Context.UserAgent != nil { + userAgent = *qce.Context.UserAgent + } + if qce.Context.ClientInfo != nil { + clientInfo = *qce.Context.ClientInfo + } + if qce.Context.Principal != nil { + principal = *qce.Context.Principal + } + if qce.Context.ResourceGroupId != nil { + resourceGroupId = qce.Context.ResourceGroupId.Value + } + + var failureMsg, failureType, failureTask, failureHost, failuresJson string + var errorCode int + var errorCodeName, errorCategory string + + if qce.FailureInfo != nil { + if qce.FailureInfo.FailureMessage != nil { + failureMsg = *qce.FailureInfo.FailureMessage + } + if qce.FailureInfo.FailureType != nil { + failureType = *qce.FailureInfo.FailureType + } + if qce.FailureInfo.FailureTask != nil { + failureTask = *qce.FailureInfo.FailureTask + } + if qce.FailureInfo.FailureHost != nil { + failureHost = *qce.FailureInfo.FailureHost + } + failuresJson = qce.FailureInfo.FailuresJson + errorCode = qce.FailureInfo.ErrorCode.Code + errorCodeName = qce.FailureInfo.ErrorCode.Name + errorCategory = qce.FailureInfo.ErrorCode.Type + } + + // Handle transaction_id + var transactionId string + if qce.Metadata.TransactionId != nil { + transactionId = *qce.Metadata.TransactionId + } + + // Marshal client_tags + clientTagsJson, err := json.Marshal(qce.Context.ClientTags) + if err != nil { + log.Error().Err(err).Str("query_id", queryId).Msg("failed to marshal client tags") + clientTagsJson = []byte("[]") + } + + // Marshal resource_estimates + resourceEstimatesJson, err := json.Marshal(qce.Context.ResourceEstimates) + if err != nil { + log.Error().Err(err).Str("query_id", queryId).Msg("failed to marshal resource estimates") + resourceEstimatesJson = []byte("{}") + } + + sessionPropsJson, err := json.Marshal(qce.Context.SessionProperties) + if err != nil { + log.Error().Err(err).Str("query_id", queryId).Msg("failed to marshal session properties") + sessionPropsJson = []byte("{}") + } + warningsJson, err := json.Marshal(qce.Warnings) + if err != nil { + log.Error().Err(err).Str("query_id", queryId).Msg("failed to marshal warnings") + warningsJson = []byte("[]") + } + + var analysisTimeMs int64 + if qce.Statistics.AnalysisTime != nil { + analysisTimeMs = qce.Statistics.AnalysisTime.Milliseconds() + } + + // Calculate query_execution_time_ms from ResourceEstimates + var queryExecutionTimeMs int64 + if qce.Context.ResourceEstimates.ExecutionTime != nil { + queryExecutionTimeMs = qce.Context.ResourceEstimates.ExecutionTime.Milliseconds() + } + + cpuTimeMs := qce.Statistics.CpuTime.Milliseconds() + var bytesPerCpuSec, bytesPerSec, rowsPerCpuSec int64 + if cpuTimeMs > 0 { + bytesPerCpuSec = qce.Statistics.TotalBytes / cpuTimeMs + rowsPerCpuSec = qce.Statistics.TotalRows / cpuTimeMs + } + // Calculate bytes_per_sec from ResourceEstimates.ExecutionTime + if queryExecutionTimeMs > 0 { + bytesPerSec = qce.Statistics.TotalBytes / queryExecutionTimeMs + } + + dt := qce.CreateTime.Time.Format("2006-01-02 15:04:05") + + _, err = db.ExecContext(ctx, query, + queryId, qce.Metadata.Query, queryType, schema, catalog, qce.Context.Environment, + qce.Context.User, remoteAddr, source, userAgent, qce.Metadata.Uri, + string(sessionPropsJson), qce.Context.ServerVersion, clientInfo, resourceGroupId, + principal, transactionId, string(clientTagsJson), string(resourceEstimatesJson), + qce.CreateTime.Time.Format("2006-01-02 15:04:05"), + qce.EndTime.Time.Format("2006-01-02 15:04:05"), + qce.ExecutionStartTime.Time.Format("2006-01-02 15:04:05"), + qce.Metadata.QueryState, + failureMsg, failureType, failuresJson, failureTask, failureHost, + errorCode, errorCodeName, errorCategory, string(warningsJson), + qce.Statistics.CompletedSplits, analysisTimeMs, + qce.Statistics.QueuedTime.Milliseconds(), + qce.Statistics.WallTime.Milliseconds(), + queryExecutionTimeMs, + bytesPerCpuSec, bytesPerSec, rowsPerCpuSec, + qce.Statistics.TotalBytes, qce.Statistics.TotalRows, + qce.Statistics.OutputPositions, qce.Statistics.OutputBytes, + qce.Statistics.WrittenOutputRows, qce.Statistics.WrittenOutputBytes, qce.Statistics.CumulativeMemory, + qce.Statistics.PeakUserMemoryBytes, + qce.Statistics.PeakTotalNonRevocableMemoryBytes, qce.Statistics.PeakTaskTotalMemory, qce.Statistics.PeakTaskUserMemory, + qce.Statistics.WrittenOutputBytes, qce.Statistics.PeakNodeTotalMemory, + cpuTimeMs, + len(qce.StageStatistics), qce.Statistics.CumulativeTotalMemory, dt, + ) + + return err +} diff --git a/cmd/loadeljson/main.go b/cmd/loadeljson/main.go new file mode 100644 index 0000000..8bace4b --- /dev/null +++ b/cmd/loadeljson/main.go @@ -0,0 +1,358 @@ +package loadeljson + +import ( + "bufio" + "context" + "database/sql" + "encoding/json" + "fmt" + "os" + "os/signal" + "path/filepath" + "pbench/log" + "pbench/stage" + "pbench/utils" + "reflect" + "sync" + "syscall" + "time" + + "github.com/spf13/cobra" +) + +var ( + RunName string + Comment string + OutputPath string + RecordRun bool + MySQLCfgPath string + InfluxCfgPath string + Parallelism int + IsNDJSON bool + + runRecorders = make([]stage.RunRecorder, 0, 3) + queryResults = make([]*stage.QueryResult, 0, 8) + runStartTime, runEndTime = newSyncedTime(time.Now()), newSyncedTime(time.UnixMilli(0)) + mysqlDb *sql.DB + pseudoStage *stage.Stage + + parallelismGuard chan struct{} + resultChan = make(chan *stage.QueryResult) + runningTasks sync.WaitGroup +) + +func Run(_ *cobra.Command, args []string) { + OutputPath = filepath.Join(OutputPath, RunName) + utils.PrepareOutputDirectory(OutputPath) + + // also start to write logs to the output directory from this point on. + logPath := filepath.Join(OutputPath, "loadeljson.log") + flushLog := utils.InitLogFile(logPath) + defer flushLog() + + // Any error run recorder initialization will make the run recorder a noop. + // The program will continue with corresponding error logs. + mysqlDb = utils.InitMySQLConnFromCfg(MySQLCfgPath) + if RecordRun { + registerRunRecorder(stage.NewFileBasedRunRecorder()) + registerRunRecorder(stage.NewInfluxRunRecorder(InfluxCfgPath)) + registerRunRecorder(stage.NewMySQLRunRecorderWithDb(mysqlDb)) + } + + log.Info().Int("parallelism", Parallelism).Send() + ctx, cancel := context.WithCancel(context.Background()) + timeToExit := make(chan os.Signal, 1) + signal.Notify(timeToExit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) + // Handle SIGINT, SIGTERM, and SIGQUIT. When ctx is canceled, in-progress MySQL transactions and InfluxDB operations will roll back. + go func() { + sig := <-timeToExit + if sig != nil { + log.Info().Msg("abort loading") + cancel() + } + }() + + // To reuse the `pbench run` code, especially run recorders, we create a pseudo main stage. + pseudoStage = &stage.Stage{ + Id: "load_el_json", + ColdRuns: &stage.RunsValueOne, + States: &stage.SharedStageStates{ + RunName: RunName, + Comment: Comment, + OutputPath: OutputPath, + RunStartTime: time.Now(), + }, + } + + // Kick off preparation work for all the run recorders + for _, recorder := range runRecorders { + if err := recorder.Start(ctx, pseudoStage); err != nil { + log.Fatal().Err(err).Msgf("failed to prepare %s", reflect.TypeOf(recorder).Name()) + } + } + + // Use this to make sure there will be no more than Parallelism goroutines. + parallelismGuard = make(chan struct{}, Parallelism) + + // This is the task scheduler go routine. It feeds files to task runners with back pressure. + go func() { + for _, path := range args { + if ctx.Err() != nil { + break + } + if err := processPath(ctx, path); err != nil { + // This whole command is not abort-on-error. We only log errors. + log.Error().Str("path", path).Err(err).Msg("failed to process path") + } + } + // Keep the main thread waiting for queryResults until all task runner finishes. + runningTasks.Wait() + close(resultChan) + }() + + for qr := range resultChan { + queryResults = append(queryResults, qr) + } + + pseudoStage.States.RunStartTime = runStartTime.GetTime() + pseudoStage.States.RunFinishTime = runEndTime.GetTime() + for _, r := range runRecorders { + r.RecordRun(utils.GetCtxWithTimeout(time.Second*5), pseudoStage, queryResults) + } + + log.Info().Int("file_loaded", len(queryResults)).Send() + // This causes the signal handler to exit. + close(timeToExit) +} + +func scheduleFile(ctx context.Context, path string) { + parallelismGuard <- struct{}{} + runningTasks.Add(1) + if IsNDJSON { + go processNDJSONFile(ctx, path) + } else { + go processFile(ctx, path) + } +} + +func processFile(ctx context.Context, path string) { + defer func() { + // Allow another task runner to start. + <-parallelismGuard + runningTasks.Done() + }() + + bytes, ioErr := os.ReadFile(path) + if ioErr != nil { + log.Error().Err(ioErr).Str("path", path).Msg("failed to read file") + return + } + + processJSONBytes(ctx, path, bytes, 0) +} + +func processNDJSONFile(ctx context.Context, path string) { + defer func() { + // Allow another task runner to start. + <-parallelismGuard + runningTasks.Done() + }() + + file, err := os.Open(path) + if err != nil { + log.Error().Err(err).Str("path", path).Msg("failed to open NDJSON file") + return + } + defer file.Close() + + scanner := bufio.NewScanner(file) + // Increase buffer size to handle large JSON lines (default is 64KB, we set to 10MB) + const maxCapacity = 10 * 1024 * 1024 // 10MB + buf := make([]byte, maxCapacity) + scanner.Buffer(buf, maxCapacity) + + lineNum := 0 + for scanner.Scan() { + lineNum++ + if ctx.Err() != nil { + log.Info().Str("path", path).Msg("abort processing NDJSON file") + break + } + line := scanner.Bytes() + if len(line) == 0 { + continue + } + processJSONBytes(ctx, path, line, lineNum) + } + + if err := scanner.Err(); err != nil { + log.Error().Err(err).Str("path", path).Msg("error reading NDJSON file") + } +} + +func processJSONBytes(ctx context.Context, path string, jsonBytes []byte, lineNum int) { + queryEvent := new(QueryEvent) + // Note that this step can succeed with any valid JSON file. But we need to do some additional validation to skip + // invalid event listener JSON files. + if unmarshalErr := json.Unmarshal(jsonBytes, queryEvent); unmarshalErr != nil { + if lineNum > 0 { + log.Error().Err(unmarshalErr).Str("path", path).Int("line", lineNum).Msg("failed to unmarshal JSON") + } else { + log.Error().Err(unmarshalErr).Str("path", path).Msg("failed to unmarshal JSON") + } + return + } + + // Validate that this is a QueryCompletedEvent + if queryEvent.QueryCompletedEvent == nil { + if lineNum > 0 { + log.Error().Str("path", path).Int("line", lineNum).Msg("no QueryCompletedEvent found") + } else { + log.Error().Str("path", path).Msg("no QueryCompletedEvent found in file") + } + return + } + + qce := queryEvent.QueryCompletedEvent + if qce.Metadata.QueryId == "" || qce.CreateTime.Time.IsZero() { + if lineNum > 0 { + log.Error().Str("path", path).Int("line", lineNum).Msg("invalid QueryCompletedEvent: missing queryId or createTime") + } else { + log.Error().Str("path", path).Msg("invalid QueryCompletedEvent: missing queryId or createTime") + } + return + } + + // Copy the Plan from QueryEvent to QueryMetadata + qce.Metadata.Plan = &queryEvent.Plan + + if lineNum > 0 { + log.Info().Str("path", path).Int("line", lineNum).Msg("start to process event listener line") + } else { + log.Info().Str("path", path).Msg("start to process event listener file") + } + + queryId := qce.Metadata.QueryId + if RecordRun { + queryId = RunName + "_" + queryId + } + + fileName := filepath.Base(path) + queryResult := &stage.QueryResult{ + StageId: pseudoStage.Id, + Query: &stage.Query{ + Text: qce.Metadata.Query, + File: &fileName, + ColdRun: true, + ExpectedRowCount: -1, // means disabled + }, + QueryId: queryId, + InfoUrl: qce.Metadata.Uri, + RowCount: int(qce.Statistics.OutputPositions), + StartTime: qce.CreateTime.Time, + EndTime: &qce.EndTime.Time, + } + + if qce.FailureInfo != nil { + // Need to set this so the run recorders will mark this query as failed. + queryResult.QueryError = fmt.Errorf("%s", qce.FailureInfo.ErrorCode.Name) + } + + // Unlike benchmarks run by pbench, we do not know when did the run start and finish when loading them from files. + // We infer that the whole run starts at min(queryStartTime) and ends at max(queryEndTime). + runStartTime.Synchronized(func(st *syncedTime) { + if queryResult.StartTime.Before(st.t) { + st.t = queryResult.StartTime + // Changes to the pseudoStage will be synced to the database by the run recorder. + pseudoStage.States.RunStartTime = queryResult.StartTime + } + }) + + if queryResult.EndTime != nil { + dur := queryResult.EndTime.Sub(queryResult.StartTime) + queryResult.Duration = &dur + runEndTime.Synchronized(func(st *syncedTime) { + if queryResult.EndTime.After(st.t) { + st.t = *queryResult.EndTime + } + }) + } + + // Insert into MySQL if configured + if mysqlDb != nil { + if err := insertEventListenerData(ctx, mysqlDb, qce, queryId); err != nil { + if lineNum > 0 { + log.Error().Err(err).Str("path", path).Int("line", lineNum).Msg("failed to insert event listener record") + } else { + log.Error().Err(err).Str("path", path).Msg("failed to insert event listener record") + } + return + } + } + + for _, r := range runRecorders { + r.RecordQuery(utils.GetCtxWithTimeout(time.Second*5), pseudoStage, queryResult) + } + + if lineNum > 0 { + log.Info().Str("path", path).Int("line", lineNum).Str("query_id", queryId).Msg("success") + } else { + log.Info().Str("path", path).Str("query_id", queryId).Msg("success") + } + resultChan <- queryResult +} + +func processPath(ctx context.Context, path string) error { + utils.ExpandHomeDirectory(&path) + stat, err := os.Stat(path) + if err != nil { + return err + } + if !stat.IsDir() { + // Skip dot files + if filepath.Base(path)[0] == '.' { + log.Debug().Str("path", path).Msg("skipping dot file") + return nil + } + scheduleFile(ctx, path) + return nil + } + + // Process directory recursively + return filepath.WalkDir(path, func(filePath string, d os.DirEntry, err error) error { + if err != nil { + log.Error().Err(err).Str("path", filePath).Msg("error walking directory") + return nil // Continue walking despite errors + } + + if ctx.Err() != nil { + log.Info().Msg("abort task scheduling") + return filepath.SkipAll + } + + // Skip dot files and dot directories + name := d.Name() + if name[0] == '.' { + if d.IsDir() { + log.Debug().Str("path", filePath).Msg("skipping dot directory") + return filepath.SkipDir + } + log.Debug().Str("path", filePath).Msg("skipping dot file") + return nil + } + + // Only schedule regular files + if !d.IsDir() { + scheduleFile(ctx, filePath) + } + + return nil + }) +} + +func registerRunRecorder(r stage.RunRecorder) { + if r == nil || reflect.ValueOf(r).IsNil() { + return + } + runRecorders = append(runRecorders, r) +} diff --git a/cmd/loadeljson/synced_time.go b/cmd/loadeljson/synced_time.go new file mode 100644 index 0000000..eef8bcd --- /dev/null +++ b/cmd/loadeljson/synced_time.go @@ -0,0 +1,29 @@ +package loadeljson + +import ( + "sync" + "time" +) + +type syncedTime struct { + t time.Time + m sync.Mutex +} + +func newSyncedTime(t time.Time) *syncedTime { + return &syncedTime{ + t: t, + } +} + +func (st *syncedTime) Synchronized(f func(st *syncedTime)) { + st.m.Lock() + defer st.m.Unlock() + f(st) +} + +func (st *syncedTime) GetTime() time.Time { + st.m.Lock() + defer st.m.Unlock() + return st.t +} diff --git a/cmd/loadeljson/types.go b/cmd/loadeljson/types.go new file mode 100644 index 0000000..399bcd5 --- /dev/null +++ b/cmd/loadeljson/types.go @@ -0,0 +1,327 @@ +package loadeljson + +import ( + "encoding/json" + "fmt" + "time" +) + +// PrestoTime is a custom time type that can unmarshal from both string and numeric formats +type PrestoTime struct { + time.Time +} + +// ResourceGroupId is a custom type that can handle both string and array formats +type ResourceGroupId struct { + Value string +} + +func (pt *PrestoTime) UnmarshalJSON(b []byte) error { + var v interface{} + if err := json.Unmarshal(b, &v); err != nil { + return err + } + + switch value := v.(type) { + case string: + // Try parsing as RFC3339 format + t, err := time.Parse(time.RFC3339, value) + if err != nil { + // Try parsing as RFC3339Nano format + t, err = time.Parse(time.RFC3339Nano, value) + if err != nil { + return err + } + } + pt.Time = t + return nil + case float64: + // Assume it's seconds since epoch (Unix timestamp) + pt.Time = time.Unix(int64(value), int64((value-float64(int64(value)))*1e9)) + return nil + default: + return fmt.Errorf("invalid time type: %T", value) + } +} + +func (rg *ResourceGroupId) UnmarshalJSON(b []byte) error { + var v interface{} + if err := json.Unmarshal(b, &v); err != nil { + return err + } + + switch value := v.(type) { + case string: + rg.Value = value + return nil + case []interface{}: + // If it's an array, join the elements with "." + parts := make([]string, 0, len(value)) + for _, item := range value { + if str, ok := item.(string); ok { + parts = append(parts, str) + } + } + if len(parts) > 0 { + rg.Value = parts[0] + for i := 1; i < len(parts); i++ { + rg.Value += "." + parts[i] + } + } + return nil + default: + return fmt.Errorf("invalid resource group id type: %T", value) + } +} + +// QueryEvent represents the event listener JSON structure +type QueryEvent struct { + InstanceId string `json:"instanceId"` + ClusterName string `json:"clusterName"` + QueryCreatedEvent *QueryCreatedEvent `json:"queryCreatedEvent,omitempty"` + QueryCompletedEvent *QueryCompletedEvent `json:"queryCompletedEvent,omitempty"` + SplitCompletedEvent interface{} `json:"splitCompletedEvent,omitempty"` + Plan string `json:"plan"` + CpuTimeMillis int64 `json:"cpuTimeMillis"` + RetriedCpuTimeMillis int64 `json:"retriedCpuTimeMillis"` + WallTimeMillis int64 `json:"wallTimeMillis"` + QueuedTimeMillis int64 `json:"queuedTimeMillis"` + AnalysisTimeMillis int64 `json:"analysisTimeMillis"` +} + +type QueryCreatedEvent struct { + CreateTime PrestoTime `json:"createTime"` + Context QueryContext `json:"context"` + Metadata QueryMetadata `json:"metadata"` +} + +type ResourceEstimates struct { + ExecutionTime *Duration `json:"executionTime,omitempty"` + CpuTime *Duration `json:"cpuTime,omitempty"` + PeakMemory *int64 `json:"peakMemory,omitempty"` + PeakTaskMemory *int64 `json:"peakTaskMemory,omitempty"` +} + +type QueryCompletedEvent struct { + Metadata QueryMetadata `json:"metadata"` + Statistics QueryStatistics `json:"statistics"` + Context QueryContext `json:"context"` + IoMetadata QueryIOMetadata `json:"ioMetadata"` + FailureInfo *QueryFailureInfo `json:"failureInfo,omitempty"` + Warnings []interface{} `json:"warnings"` + QueryType *string `json:"queryType,omitempty"` + FailedTasks []string `json:"failedTasks"` + CreateTime PrestoTime `json:"createTime"` + ExecutionStartTime PrestoTime `json:"executionStartTime"` + EndTime PrestoTime `json:"endTime"` + StageStatistics []StageStatistics `json:"stageStatistics"` + OperatorStatistics []OperatorStatistics `json:"operatorStatistics"` + PlanStatisticsRead []interface{} `json:"planStatisticsRead"` + PlanStatisticsWritten []interface{} `json:"planStatisticsWritten"` + PlanNodeHash interface{} `json:"planNodeHash"` + CanonicalPlan interface{} `json:"canonicalPlan"` + StatsEquivalentPlan *string `json:"statsEquivalentPlan,omitempty"` + ExpandedQuery *string `json:"expandedQuery,omitempty"` + OptimizerInformation []interface{} `json:"optimizerInformation"` + CteInformationList []interface{} `json:"cteInformationList"` + ScalarFunctions []string `json:"scalarFunctions"` + AggregateFunctions []string `json:"aggregateFunctions"` + WindowFunctions []string `json:"windowFunctions"` +} + +type QueryMetadata struct { + QueryId string `json:"queryId"` + TransactionId *string `json:"transactionId,omitempty"` + Query string `json:"query"` + QueryState string `json:"queryState"` + Uri string `json:"uri"` + Plan *string `json:"plan,omitempty"` + JsonPlan *string `json:"jsonPlan,omitempty"` + QueryHash *string `json:"queryHash,omitempty"` +} + +type QueryStatistics struct { + CpuTime Duration `json:"cpuTime"` + WallTime Duration `json:"wallTime"` + QueuedTime Duration `json:"queuedTime"` + AnalysisTime *Duration `json:"analysisTime,omitempty"` + PeakUserMemoryBytes int64 `json:"peakUserMemoryBytes"` + PeakTotalNonRevocableMemoryBytes int64 `json:"peakTotalNonRevocableMemoryBytes"` + PeakTaskUserMemory int64 `json:"peakTaskUserMemory"` + PeakTaskTotalMemory int64 `json:"peakTaskTotalMemory"` + PeakNodeTotalMemory int64 `json:"peakNodeTotalMemory"` + TotalBytes int64 `json:"totalBytes"` + TotalRows int64 `json:"totalRows"` + OutputPositions int64 `json:"outputPositions"` + OutputBytes int64 `json:"outputBytes"` + WrittenOutputRows int64 `json:"writtenOutputRows"` + WrittenOutputBytes int64 `json:"writtenOutputBytes"` + CumulativeMemory float64 `json:"cumulativeMemory"` + CumulativeTotalMemory float64 `json:"cumulativeTotalMemory"` + CompletedSplits int `json:"completedSplits"` +} + +type QueryContext struct { + User string `json:"user"` + Principal *string `json:"principal,omitempty"` + RemoteClientAddress *string `json:"remoteClientAddress,omitempty"` + UserAgent *string `json:"userAgent,omitempty"` + ClientInfo *string `json:"clientInfo,omitempty"` + Source *string `json:"source,omitempty"` + Catalog *string `json:"catalog,omitempty"` + Schema *string `json:"schema,omitempty"` + ResourceGroupId *ResourceGroupId `json:"resourceGroupId,omitempty"` + SessionProperties map[string]string `json:"sessionProperties"` + ServerVersion string `json:"serverVersion"` + Environment string `json:"environment"` + ClientTags []string `json:"clientTags"` + ResourceEstimates ResourceEstimates `json:"resourceEstimates"` +} + +type QueryIOMetadata struct { + Inputs []interface{} `json:"inputs"` + Output interface{} `json:"output,omitempty"` +} + +type QueryFailureInfo struct { + ErrorCode ErrorCode `json:"errorCode"` + FailureType *string `json:"failureType,omitempty"` + FailureMessage *string `json:"failureMessage,omitempty"` + FailureTask *string `json:"failureTask,omitempty"` + FailureHost *string `json:"failureHost,omitempty"` + FailuresJson string `json:"failuresJson"` +} + +type ErrorCode struct { + Code int `json:"code"` + Name string `json:"name"` + Type string `json:"type"` +} + +type StageStatistics struct { + StageId int `json:"stageId"` + StageExecutionId int `json:"stageExecutionId"` + Tasks int `json:"tasks"` + TotalScheduledTime Duration `json:"totalScheduledTime"` + TotalCpuTime Duration `json:"totalCpuTime"` + RetriedCpuTime Duration `json:"retriedCpuTime"` + TotalBlockedTime Duration `json:"totalBlockedTime"` + RawInputDataSize DataSize `json:"rawInputDataSize"` + ProcessedInputDataSize DataSize `json:"processedInputDataSize"` + PhysicalWrittenDataSize DataSize `json:"physicalWrittenDataSize"` +} + +type OperatorStatistics struct { + StageId int `json:"stageId"` + StageExecutionId int `json:"stageExecutionId"` + PipelineId int `json:"pipelineId"` + OperatorId int `json:"operatorId"` + PlanNodeId string `json:"planNodeId"` + OperatorType string `json:"operatorType"` + TotalDrivers int64 `json:"totalDrivers"` + AddInputCalls int64 `json:"addInputCalls"` + AddInputWall Duration `json:"addInputWall"` + AddInputCpu Duration `json:"addInputCpu"` + AddInputAllocation DataSize `json:"addInputAllocation"` + RawInputDataSize DataSize `json:"rawInputDataSize"` + RawInputPositions int64 `json:"rawInputPositions"` + InputDataSize DataSize `json:"inputDataSize"` + InputPositions int64 `json:"inputPositions"` + GetOutputCalls int64 `json:"getOutputCalls"` + GetOutputWall Duration `json:"getOutputWall"` + GetOutputCpu Duration `json:"getOutputCpu"` + GetOutputAllocation DataSize `json:"getOutputAllocation"` + OutputDataSize DataSize `json:"outputDataSize"` + OutputPositions int64 `json:"outputPositions"` + PhysicalWrittenDataSize DataSize `json:"physicalWrittenDataSize"` + BlockedWall Duration `json:"blockedWall"` + FinishCalls int64 `json:"finishCalls"` + FinishWall Duration `json:"finishWall"` + FinishCpu Duration `json:"finishCpu"` + FinishAllocation DataSize `json:"finishAllocation"` + UserMemoryReservation DataSize `json:"userMemoryReservation"` + RevocableMemoryReservation DataSize `json:"revocableMemoryReservation"` + SystemMemoryReservation DataSize `json:"systemMemoryReservation"` + PeakUserMemoryReservation DataSize `json:"peakUserMemoryReservation"` + PeakSystemMemoryReservation DataSize `json:"peakSystemMemoryReservation"` + PeakTotalMemoryReservation DataSize `json:"peakTotalMemoryReservation"` + SpilledDataSize DataSize `json:"spilledDataSize"` + Info *string `json:"info,omitempty"` +} + +// Duration represents a time duration in the format used by Presto +type Duration struct { + time.Duration +} + +func (d *Duration) UnmarshalJSON(b []byte) error { + var v interface{} + if err := json.Unmarshal(b, &v); err != nil { + return err + } + switch value := v.(type) { + case float64: + d.Duration = time.Duration(value) * time.Millisecond + return nil + case string: + // Parse Presto duration format: "43.99d", "1.5h", "30m", "45s", "100ms", etc. + var amount float64 + var unit string + n, err := fmt.Sscanf(value, "%f%s", &amount, &unit) + if err != nil || n != 2 { + // Try standard Go duration format as fallback + d.Duration, err = time.ParseDuration(value) + return err + } + + // Convert Presto units to Go duration + switch unit { + case "d": + d.Duration = time.Duration(amount * float64(24*time.Hour)) + case "h": + d.Duration = time.Duration(amount * float64(time.Hour)) + case "m": + d.Duration = time.Duration(amount * float64(time.Minute)) + case "s": + d.Duration = time.Duration(amount * float64(time.Second)) + case "ms": + d.Duration = time.Duration(amount * float64(time.Millisecond)) + case "us": + d.Duration = time.Duration(amount * float64(time.Microsecond)) + case "ns": + d.Duration = time.Duration(amount * float64(time.Nanosecond)) + default: + return fmt.Errorf("unknown duration unit: %s", unit) + } + return nil + default: + return fmt.Errorf("invalid duration type") + } +} + +// DataSize represents a data size in bytes +type DataSize struct { + Bytes int64 +} + +func (ds *DataSize) UnmarshalJSON(b []byte) error { + var v interface{} + if err := json.Unmarshal(b, &v); err != nil { + return err + } + switch value := v.(type) { + case float64: + ds.Bytes = int64(value) + return nil + case string: + // Parse string like "1.5MB", "100KB", etc. + // For simplicity, we'll just try to extract the number + var size float64 + var unit string + fmt.Sscanf(value, "%f%s", &size, &unit) + ds.Bytes = int64(size) + return nil + default: + return fmt.Errorf("invalid data size type") + } +} diff --git a/stage/mysql_run_recorder.go b/stage/mysql_run_recorder.go index 7c56aca..f507829 100644 --- a/stage/mysql_run_recorder.go +++ b/stage/mysql_run_recorder.go @@ -4,9 +4,10 @@ import ( "context" "database/sql" _ "embed" - _ "github.com/go-sql-driver/mysql" "pbench/log" "pbench/utils" + + _ "github.com/go-sql-driver/mysql" ) var ( @@ -48,7 +49,7 @@ func NewMySQLRunRecorderWithDb(db *sql.DB) *MySQLRunRecorder { } func (m *MySQLRunRecorder) Start(_ context.Context, s *Stage) error { - recordNewRun := `INSERT INTO pbench_runs (run_name, cluster_fqdn, start_time, queries_ran, failed, mismatch, comment) + recordNewRun := `REPLACE INTO pbench_runs (run_name, cluster_fqdn, start_time, queries_ran, failed, mismatch, comment) VALUES (?, ?, ?, 0, 0, 0, ?)` res, err := m.db.Exec(recordNewRun, s.States.RunName, s.States.ServerFQDN, s.States.RunStartTime, s.States.Comment) if err != nil { @@ -64,7 +65,7 @@ VALUES (?, ?, ?, 0, 0, 0, ?)` } func (m *MySQLRunRecorder) RecordQuery(_ context.Context, s *Stage, result *QueryResult) { - recordNewQuery := `INSERT INTO pbench_queries (run_id, stage_id, query_file, query_index, query_id, sequence_no, + recordNewQuery := `REPLACE INTO pbench_queries (run_id, stage_id, query_file, query_index, query_id, sequence_no, cold_run, succeeded, start_time, end_time, row_count, expected_row_count, duration_ms, info_url) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` var queryFile string if result.Query.File != nil {