From 748af6a966978662b93910d15e71682b42be5648 Mon Sep 17 00:00:00 2001 From: unidevel Date: Sat, 15 Nov 2025 08:15:21 +0000 Subject: [PATCH 1/5] Add loadeljson --- cmd/loadeljson.go | 47 +++++ cmd/loadeljson/db.go | 385 ++++++++++++++++++++++++++++++++++ cmd/loadeljson/main.go | 265 +++++++++++++++++++++++ cmd/loadeljson/synced_time.go | 29 +++ cmd/loadeljson/types.go | 327 +++++++++++++++++++++++++++++ 5 files changed, 1053 insertions(+) create mode 100644 cmd/loadeljson.go create mode 100644 cmd/loadeljson/db.go create mode 100644 cmd/loadeljson/main.go create mode 100644 cmd/loadeljson/synced_time.go create mode 100644 cmd/loadeljson/types.go diff --git a/cmd/loadeljson.go b/cmd/loadeljson.go new file mode 100644 index 0000000..c295a1f --- /dev/null +++ b/cmd/loadeljson.go @@ -0,0 +1,47 @@ +package cmd + +import ( + "fmt" + "os" + "pbench/cmd/loadeljson" + "pbench/utils" + "runtime" + "time" + + "github.com/spf13/cobra" +) + +// loadElJsonCmd represents the loadeljson command +var loadElJsonCmd = &cobra.Command{ + Use: `loadeljson [flags] [list of files or directories to process]`, + Short: "Load event listener JSON files into database and run recorders", + Long: `Load event listener JSON files (QueryCompletedEvent) into database and run recorders`, + DisableFlagsInUseLine: true, + Args: func(cmd *cobra.Command, args []string) error { + if len(args) < 1 { + return fmt.Errorf("requires at least 1 arg, only received %d", len(args)) + } + if loadeljson.Parallelism < 1 || loadeljson.Parallelism > runtime.NumCPU() { + return fmt.Errorf("invalid parallelism: %d, it should be >= 1 and <= %d", loadeljson.Parallelism, runtime.NumCPU()) + } + utils.ExpandHomeDirectory(&loadeljson.MySQLCfgPath) + utils.ExpandHomeDirectory(&loadeljson.InfluxCfgPath) + utils.ExpandHomeDirectory(&loadeljson.OutputPath) + return nil + }, + Run: loadeljson.Run, +} + +func init() { + RootCmd.AddCommand(loadElJsonCmd) + wd, _ := os.Getwd() + loadElJsonCmd.Flags().StringVarP(&loadeljson.RunName, "name", "n", fmt.Sprintf("load_el_%s", time.Now().Format(utils.DirectoryNameTimeFormat)), `Assign a name to this run. (default: "load_el_")`) + loadElJsonCmd.Flags().StringVarP(&loadeljson.Comment, "comment", "c", "", `Add a comment to this run (optional)`) + loadElJsonCmd.Flags().BoolVarP(&loadeljson.RecordRun, "record-run", "r", false, "Record all the loaded JSON as a run") + loadElJsonCmd.Flags().StringVarP(&loadeljson.OutputPath, "output-path", "o", wd, "Output directory path") + loadElJsonCmd.Flags().IntVarP(&loadeljson.Parallelism, "parallel", "P", runtime.NumCPU(), "Number of parallel threads to load json files") + loadElJsonCmd.Flags().StringVar(&loadeljson.InfluxCfgPath, "influx", "", "InfluxDB connection config for run recorder (optional)") + loadElJsonCmd.Flags().StringVar(&loadeljson.MySQLCfgPath, "mysql", "", "MySQL connection config for event listener and run recorder (optional)") +} + +// Made with Bob diff --git a/cmd/loadeljson/db.go b/cmd/loadeljson/db.go new file mode 100644 index 0000000..e81c6ce --- /dev/null +++ b/cmd/loadeljson/db.go @@ -0,0 +1,385 @@ +package loadeljson + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "pbench/log" +) + +// insertEventListenerData inserts data into multiple tables to mirror Java implementation in MySQLWriter.post() +func insertEventListenerData(ctx context.Context, db *sql.DB, qce *QueryCompletedEvent, queryId string) error { + // 1. Insert into presto_query_creation_info + if err := insertQueryCreationInfo(ctx, db, qce, queryId); err != nil { + return fmt.Errorf("failed to insert query creation info: %w", err) + } + + // 2. Insert into presto_query_plans + if err := insertQueryPlans(ctx, db, qce, queryId); err != nil { + return fmt.Errorf("failed to insert query plans: %w", err) + } + + // 3. Insert into presto_query_stage_stats + if err := insertQueryStageStats(ctx, db, qce, queryId); err != nil { + return fmt.Errorf("failed to insert query stage stats: %w", err) + } + + // 4. Insert into presto_query_operator_stats + if err := insertQueryOperatorStats(ctx, db, qce, queryId); err != nil { + return fmt.Errorf("failed to insert query operator stats: %w", err) + } + + // 5. Insert into presto_query_statistics + if err := insertQueryStatistics(ctx, db, qce, queryId); err != nil { + return fmt.Errorf("failed to insert query statistics: %w", err) + } + + return nil +} + +func insertQueryCreationInfo(ctx context.Context, db *sql.DB, qce *QueryCompletedEvent, queryId string) error { + query := `INSERT INTO presto_query_creation_info( + query_id, query, create_time, schema_name, catalog_name, environment, + user, remote_client_address, source, user_agent, uri, + session_properties_json, server_version, client_info, resource_group_name, + principal, transaction_id, client_tags, resource_estimates, dt + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` + + var schema, catalog, remoteAddr, source, userAgent, clientInfo, principal, resourceGroupId string + if qce.Context.Schema != nil { + schema = *qce.Context.Schema + } + if qce.Context.Catalog != nil { + catalog = *qce.Context.Catalog + } + if qce.Context.RemoteClientAddress != nil { + remoteAddr = *qce.Context.RemoteClientAddress + } + if qce.Context.Source != nil { + source = *qce.Context.Source + } + if qce.Context.UserAgent != nil { + userAgent = *qce.Context.UserAgent + } + if qce.Context.ClientInfo != nil { + clientInfo = *qce.Context.ClientInfo + } + if qce.Context.Principal != nil { + principal = *qce.Context.Principal + } + if qce.Context.ResourceGroupId != nil { + resourceGroupId = qce.Context.ResourceGroupId.Value + } + + // Handle transaction_id + var transactionId string + if qce.Metadata.TransactionId != nil { + transactionId = *qce.Metadata.TransactionId + } + + // Marshal client_tags + clientTagsJson, err := json.Marshal(qce.Context.ClientTags) + if err != nil { + log.Error().Err(err).Str("query_id", queryId).Msg("failed to marshal client tags") + clientTagsJson = []byte("[]") + } + + // Marshal resource_estimates + resourceEstimatesJson, err := json.Marshal(qce.Context.ResourceEstimates) + if err != nil { + log.Error().Err(err).Str("query_id", queryId).Msg("failed to marshal resource estimates") + resourceEstimatesJson = []byte("{}") + } + + sessionPropsJson, err := json.Marshal(qce.Context.SessionProperties) + if err != nil { + log.Error().Err(err).Str("query_id", queryId).Msg("failed to marshal session properties") + sessionPropsJson = []byte("{}") + } + + dt := qce.CreateTime.Time.Format("2006-01-02 15:04:05") + + _, err = db.ExecContext(ctx, query, + queryId, qce.Metadata.Query, + qce.CreateTime.Time.Format("2006-01-02 15:04:05"), + schema, catalog, qce.Context.Environment, + qce.Context.User, remoteAddr, source, userAgent, qce.Metadata.Uri, + string(sessionPropsJson), qce.Context.ServerVersion, clientInfo, resourceGroupId, + principal, transactionId, string(clientTagsJson), string(resourceEstimatesJson), dt, + ) + + return err +} + +func insertQueryPlans(ctx context.Context, db *sql.DB, qce *QueryCompletedEvent, queryId string) error { + query := `INSERT INTO presto_query_plans( + query_id, query, plan, json_plan, environment, dt + ) VALUES (?, ?, ?, ?, ?, ?)` + + var plan, jsonPlan string + if qce.Metadata.Plan != nil { + plan = *qce.Metadata.Plan + } + if qce.Metadata.JsonPlan != nil { + jsonPlan = *qce.Metadata.JsonPlan + } + + dt := qce.CreateTime.Time.Format("2006-01-02 15:04:05") + + _, err := db.ExecContext(ctx, query, + queryId, qce.Metadata.Query, plan, jsonPlan, qce.Context.Environment, dt, + ) + + return err +} + +func insertQueryStageStats(ctx context.Context, db *sql.DB, qce *QueryCompletedEvent, queryId string) error { + if len(qce.StageStatistics) == 0 { + return nil + } + + query := `INSERT INTO presto_query_stage_stats( + query_id, stage_id, stage_execution_id, tasks, + total_scheduled_time_ms, total_cpu_time_ms, retried_cpu_time_ms, total_blocked_time_ms, + raw_input_data_size_bytes, processed_input_data_size_bytes, physical_written_data_size_bytes, + gc_statistics, cpu_distribution, memory_distribution, dt + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` + + dt := qce.CreateTime.Time.Format("2006-01-02 15:04:05") + + for _, stage := range qce.StageStatistics { + // These fields are not available in the event listener JSON, so we use empty JSON objects + gcStatsJson := []byte("{}") + cpuDistJson := []byte("{}") + memDistJson := []byte("{}") + + _, err := db.ExecContext(ctx, query, + queryId, stage.StageId, stage.StageExecutionId, stage.Tasks, + stage.TotalScheduledTime.Milliseconds(), + stage.TotalCpuTime.Milliseconds(), + stage.RetriedCpuTime.Milliseconds(), + stage.TotalBlockedTime.Milliseconds(), + stage.RawInputDataSize.Bytes, + stage.ProcessedInputDataSize.Bytes, + stage.PhysicalWrittenDataSize.Bytes, + string(gcStatsJson), string(cpuDistJson), string(memDistJson), dt, + ) + + if err != nil { + return fmt.Errorf("failed to insert stage %d: %w", stage.StageId, err) + } + } + + return nil +} + +func insertQueryOperatorStats(ctx context.Context, db *sql.DB, qce *QueryCompletedEvent, queryId string) error { + if len(qce.OperatorStatistics) == 0 { + return nil + } + + query := `INSERT INTO presto_query_operator_stats( + query_id, stage_id, stage_execution_id, pipeline_id, operator_id, + plan_node_id, operator_type, total_drivers, + add_input_calls, add_input_wall_ms, add_input_cpu_ms, add_input_allocation_bytes, + raw_input_data_size_bytes, raw_input_positions, input_data_size_bytes, input_positions, + sum_squared_input_positions, get_output_calls, get_output_wall_ms, get_output_cpu_ms, + get_output_allocation_bytes, output_data_size_bytes, output_positions, + physical_written_data_size_bytes, blocked_wall_ms, finish_calls, finish_wall_ms, + finish_cpu_ms, finish_allocation_bytes, user_memory_reservation_bytes, + revocable_memory_reservation_bytes, system_memory_reservation_bytes, + peak_user_memory_reservation_bytes, peak_system_memory_reservation_bytes, + peak_total_memory_reservation_bytes, spilled_data_size_bytes, info, dt + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` + + dt := qce.CreateTime.Time.Format("2006-01-02 15:04:05") + + for _, op := range qce.OperatorStatistics { + var info string + if op.Info != nil { + info = *op.Info + } + + _, err := db.ExecContext(ctx, query, + queryId, op.StageId, op.StageExecutionId, op.PipelineId, op.OperatorId, + op.PlanNodeId, op.OperatorType, op.TotalDrivers, + op.AddInputCalls, op.AddInputWall.Milliseconds(), op.AddInputCpu.Milliseconds(), + op.AddInputAllocation.Bytes, op.RawInputDataSize.Bytes, op.RawInputPositions, + op.InputDataSize.Bytes, op.InputPositions, 0.0, // sumSquaredInputPositions not available + op.GetOutputCalls, op.GetOutputWall.Milliseconds(), op.GetOutputCpu.Milliseconds(), + op.GetOutputAllocation.Bytes, op.OutputDataSize.Bytes, op.OutputPositions, + op.PhysicalWrittenDataSize.Bytes, op.BlockedWall.Milliseconds(), op.FinishCalls, + op.FinishWall.Milliseconds(), op.FinishCpu.Milliseconds(), op.FinishAllocation.Bytes, + op.UserMemoryReservation.Bytes, op.RevocableMemoryReservation.Bytes, + op.SystemMemoryReservation.Bytes, op.PeakUserMemoryReservation.Bytes, + op.PeakSystemMemoryReservation.Bytes, op.PeakTotalMemoryReservation.Bytes, + op.SpilledDataSize.Bytes, info, dt, + ) + + if err != nil { + return fmt.Errorf("failed to insert operator %d: %w", op.OperatorId, err) + } + } + + return nil +} + +func insertQueryStatistics(ctx context.Context, db *sql.DB, qce *QueryCompletedEvent, queryId string) error { + // Insert into presto_query_statistics table + // This mirrors the Java implementation in PrestoQueryStatsDao.insertQueryStatistics + + query := `INSERT INTO presto_query_statistics( + query_id, query, query_type, schema_name, catalog_name, environment, + user, remote_client_address, source, user_agent, uri, + session_properties_json, server_version, client_info, resource_group_name, + principal, transaction_id, client_tags, resource_estimates, + create_time, end_time, execution_start_time, query_state, + failure_message, failure_type, failures_json, failure_task, failure_host, + error_code, error_code_name, error_category, warnings_json, + splits, analysis_time_ms, queued_time_ms, query_wall_time_ms, + query_execution_time_ms, bytes_per_cpu_sec, bytes_per_sec, rows_per_cpu_sec, + total_bytes, total_rows, output_rows, output_bytes, + written_rows, written_bytes, cumulative_memory, peak_user_memory_bytes, + peak_total_memory_bytes, peak_task_total_memory, peak_task_user_memory, + written_intermediate_bytes, peak_node_total_memory, total_split_cpu_time_ms, + stage_count, cumulative_total_memory, dt + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` + + var queryType string + if qce.QueryType != nil { + queryType = *qce.QueryType + } + + var schema, catalog, remoteAddr, source, userAgent, clientInfo, principal, resourceGroupId string + if qce.Context.Schema != nil { + schema = *qce.Context.Schema + } + if qce.Context.Catalog != nil { + catalog = *qce.Context.Catalog + } + if qce.Context.RemoteClientAddress != nil { + remoteAddr = *qce.Context.RemoteClientAddress + } + if qce.Context.Source != nil { + source = *qce.Context.Source + } + if qce.Context.UserAgent != nil { + userAgent = *qce.Context.UserAgent + } + if qce.Context.ClientInfo != nil { + clientInfo = *qce.Context.ClientInfo + } + if qce.Context.Principal != nil { + principal = *qce.Context.Principal + } + if qce.Context.ResourceGroupId != nil { + resourceGroupId = qce.Context.ResourceGroupId.Value + } + + var failureMsg, failureType, failureTask, failureHost, failuresJson string + var errorCode int + var errorCodeName, errorCategory string + + if qce.FailureInfo != nil { + if qce.FailureInfo.FailureMessage != nil { + failureMsg = *qce.FailureInfo.FailureMessage + } + if qce.FailureInfo.FailureType != nil { + failureType = *qce.FailureInfo.FailureType + } + if qce.FailureInfo.FailureTask != nil { + failureTask = *qce.FailureInfo.FailureTask + } + if qce.FailureInfo.FailureHost != nil { + failureHost = *qce.FailureInfo.FailureHost + } + failuresJson = qce.FailureInfo.FailuresJson + errorCode = qce.FailureInfo.ErrorCode.Code + errorCodeName = qce.FailureInfo.ErrorCode.Name + errorCategory = qce.FailureInfo.ErrorCode.Type + } + + // Handle transaction_id + var transactionId string + if qce.Metadata.TransactionId != nil { + transactionId = *qce.Metadata.TransactionId + } + + // Marshal client_tags + clientTagsJson, err := json.Marshal(qce.Context.ClientTags) + if err != nil { + log.Error().Err(err).Str("query_id", queryId).Msg("failed to marshal client tags") + clientTagsJson = []byte("[]") + } + + // Marshal resource_estimates + resourceEstimatesJson, err := json.Marshal(qce.Context.ResourceEstimates) + if err != nil { + log.Error().Err(err).Str("query_id", queryId).Msg("failed to marshal resource estimates") + resourceEstimatesJson = []byte("{}") + } + + sessionPropsJson, err := json.Marshal(qce.Context.SessionProperties) + if err != nil { + log.Error().Err(err).Str("query_id", queryId).Msg("failed to marshal session properties") + sessionPropsJson = []byte("{}") + } + warningsJson, err := json.Marshal(qce.Warnings) + if err != nil { + log.Error().Err(err).Str("query_id", queryId).Msg("failed to marshal warnings") + warningsJson = []byte("[]") + } + + var analysisTimeMs int64 + if qce.Statistics.AnalysisTime != nil { + analysisTimeMs = qce.Statistics.AnalysisTime.Milliseconds() + } + + // Calculate query_execution_time_ms from ResourceEstimates + var queryExecutionTimeMs int64 + if qce.Context.ResourceEstimates.ExecutionTime != nil { + queryExecutionTimeMs = qce.Context.ResourceEstimates.ExecutionTime.Milliseconds() + } + + cpuTimeMs := qce.Statistics.CpuTime.Milliseconds() + var bytesPerCpuSec, bytesPerSec, rowsPerCpuSec int64 + if cpuTimeMs > 0 { + bytesPerCpuSec = qce.Statistics.TotalBytes / cpuTimeMs + rowsPerCpuSec = qce.Statistics.TotalRows / cpuTimeMs + } + // Calculate bytes_per_sec from ResourceEstimates.ExecutionTime + if queryExecutionTimeMs > 0 { + bytesPerSec = qce.Statistics.TotalBytes / queryExecutionTimeMs + } + + dt := qce.CreateTime.Time.Format("2006-01-02 15:04:05") + + _, err = db.ExecContext(ctx, query, + queryId, qce.Metadata.Query, queryType, schema, catalog, qce.Context.Environment, + qce.Context.User, remoteAddr, source, userAgent, qce.Metadata.Uri, + string(sessionPropsJson), qce.Context.ServerVersion, clientInfo, resourceGroupId, + principal, transactionId, string(clientTagsJson), string(resourceEstimatesJson), + qce.CreateTime.Time.Format("2006-01-02 15:04:05"), + qce.EndTime.Time.Format("2006-01-02 15:04:05"), + qce.ExecutionStartTime.Time.Format("2006-01-02 15:04:05"), + qce.Metadata.QueryState, + failureMsg, failureType, failuresJson, failureTask, failureHost, + errorCode, errorCodeName, errorCategory, string(warningsJson), + qce.Statistics.CompletedSplits, analysisTimeMs, + qce.Statistics.QueuedTime.Milliseconds(), + qce.Statistics.WallTime.Milliseconds(), + queryExecutionTimeMs, + bytesPerCpuSec, bytesPerSec, rowsPerCpuSec, + qce.Statistics.TotalBytes, qce.Statistics.TotalRows, + qce.Statistics.OutputPositions, qce.Statistics.OutputBytes, + qce.Statistics.WrittenOutputRows, qce.Statistics.WrittenOutputBytes, qce.Statistics.CumulativeMemory, + qce.Statistics.PeakUserMemoryBytes, + qce.Statistics.PeakTotalNonRevocableMemoryBytes, qce.Statistics.PeakTaskTotalMemory, qce.Statistics.PeakTaskUserMemory, + qce.Statistics.WrittenOutputBytes, qce.Statistics.PeakNodeTotalMemory, + cpuTimeMs, + len(qce.StageStatistics), qce.Statistics.CumulativeTotalMemory, dt, + ) + + return err +} diff --git a/cmd/loadeljson/main.go b/cmd/loadeljson/main.go new file mode 100644 index 0000000..b4c7186 --- /dev/null +++ b/cmd/loadeljson/main.go @@ -0,0 +1,265 @@ +package loadeljson + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "os" + "os/signal" + "path/filepath" + "pbench/log" + "pbench/stage" + "pbench/utils" + "reflect" + "sync" + "syscall" + "time" + + "github.com/spf13/cobra" +) + +var ( + RunName string + Comment string + OutputPath string + RecordRun bool + MySQLCfgPath string + InfluxCfgPath string + Parallelism int + + runRecorders = make([]stage.RunRecorder, 0, 3) + queryResults = make([]*stage.QueryResult, 0, 8) + runStartTime, runEndTime = newSyncedTime(time.Now()), newSyncedTime(time.UnixMilli(0)) + mysqlDb *sql.DB + pseudoStage *stage.Stage + + parallelismGuard chan struct{} + resultChan = make(chan *stage.QueryResult) + runningTasks sync.WaitGroup +) + +func Run(_ *cobra.Command, args []string) { + OutputPath = filepath.Join(OutputPath, RunName) + utils.PrepareOutputDirectory(OutputPath) + + // also start to write logs to the output directory from this point on. + logPath := filepath.Join(OutputPath, "loadeljson.log") + flushLog := utils.InitLogFile(logPath) + defer flushLog() + + // Any error run recorder initialization will make the run recorder a noop. + // The program will continue with corresponding error logs. + mysqlDb = utils.InitMySQLConnFromCfg(MySQLCfgPath) + if RecordRun { + registerRunRecorder(stage.NewFileBasedRunRecorder()) + registerRunRecorder(stage.NewInfluxRunRecorder(InfluxCfgPath)) + registerRunRecorder(stage.NewMySQLRunRecorderWithDb(mysqlDb)) + } + + log.Info().Int("parallelism", Parallelism).Send() + ctx, cancel := context.WithCancel(context.Background()) + timeToExit := make(chan os.Signal, 1) + signal.Notify(timeToExit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) + // Handle SIGINT, SIGTERM, and SIGQUIT. When ctx is canceled, in-progress MySQL transactions and InfluxDB operations will roll back. + go func() { + sig := <-timeToExit + if sig != nil { + log.Info().Msg("abort loading") + cancel() + } + }() + + // To reuse the `pbench run` code, especially run recorders, we create a pseudo main stage. + pseudoStage = &stage.Stage{ + Id: "load_el_json", + ColdRuns: &stage.RunsValueOne, + States: &stage.SharedStageStates{ + RunName: RunName, + Comment: Comment, + OutputPath: OutputPath, + RunStartTime: time.Now(), + }, + } + + // Kick off preparation work for all the run recorders + for _, recorder := range runRecorders { + if err := recorder.Start(ctx, pseudoStage); err != nil { + log.Fatal().Err(err).Msgf("failed to prepare %s", reflect.TypeOf(recorder).Name()) + } + } + + // Use this to make sure there will be no more than Parallelism goroutines. + parallelismGuard = make(chan struct{}, Parallelism) + + // This is the task scheduler go routine. It feeds files to task runners with back pressure. + go func() { + for _, path := range args { + if ctx.Err() != nil { + break + } + if err := processPath(ctx, path); err != nil { + // This whole command is not abort-on-error. We only log errors. + log.Error().Str("path", path).Err(err).Msg("failed to process path") + } + } + // Keep the main thread waiting for queryResults until all task runner finishes. + runningTasks.Wait() + close(resultChan) + }() + + for qr := range resultChan { + queryResults = append(queryResults, qr) + } + + pseudoStage.States.RunStartTime = runStartTime.GetTime() + pseudoStage.States.RunFinishTime = runEndTime.GetTime() + for _, r := range runRecorders { + r.RecordRun(utils.GetCtxWithTimeout(time.Second*5), pseudoStage, queryResults) + } + + log.Info().Int("file_loaded", len(queryResults)).Send() + // This causes the signal handler to exit. + close(timeToExit) +} + +func scheduleFile(ctx context.Context, path string) { + parallelismGuard <- struct{}{} + runningTasks.Add(1) + go processFile(ctx, path) +} + +func processFile(ctx context.Context, path string) { + defer func() { + // Allow another task runner to start. + <-parallelismGuard + runningTasks.Done() + }() + + bytes, ioErr := os.ReadFile(path) + if ioErr != nil { + log.Error().Err(ioErr).Str("path", path).Msg("failed to read file") + return + } + + queryEvent := new(QueryEvent) + // Note that this step can succeed with any valid JSON file. But we need to do some additional validation to skip + // invalid event listener JSON files. + if unmarshalErr := json.Unmarshal(bytes, queryEvent); unmarshalErr != nil { + log.Error().Err(unmarshalErr).Str("path", path).Msg("failed to unmarshal JSON") + return + } + + // Validate that this is a QueryCompletedEvent + if queryEvent.QueryCompletedEvent == nil { + log.Error().Str("path", path).Msg("no QueryCompletedEvent found in file") + return + } + + qce := queryEvent.QueryCompletedEvent + if qce.Metadata.QueryId == "" || qce.CreateTime.Time.IsZero() { + log.Error().Str("path", path).Msg("invalid QueryCompletedEvent: missing queryId or createTime") + return + } + + // Copy the Plan from QueryEvent to QueryMetadata + qce.Metadata.Plan = &queryEvent.Plan + + log.Info().Str("path", path).Msg("start to process event listener file") + + queryId := qce.Metadata.QueryId + if RecordRun { + queryId = RunName + "_" + queryId + } + + fileName := filepath.Base(path) + queryResult := &stage.QueryResult{ + StageId: pseudoStage.Id, + Query: &stage.Query{ + Text: qce.Metadata.Query, + File: &fileName, + ColdRun: true, + ExpectedRowCount: -1, // means disabled + }, + QueryId: queryId, + InfoUrl: qce.Metadata.Uri, + RowCount: int(qce.Statistics.OutputPositions), + StartTime: qce.CreateTime.Time, + EndTime: &qce.EndTime.Time, + } + + if qce.FailureInfo != nil { + // Need to set this so the run recorders will mark this query as failed. + queryResult.QueryError = fmt.Errorf("%s", qce.FailureInfo.ErrorCode.Name) + } + + // Unlike benchmarks run by pbench, we do not know when did the run start and finish when loading them from files. + // We infer that the whole run starts at min(queryStartTime) and ends at max(queryEndTime). + runStartTime.Synchronized(func(st *syncedTime) { + if queryResult.StartTime.Before(st.t) { + st.t = queryResult.StartTime + // Changes to the pseudoStage will be synced to the database by the run recorder. + pseudoStage.States.RunStartTime = queryResult.StartTime + } + }) + + if queryResult.EndTime != nil { + dur := queryResult.EndTime.Sub(queryResult.StartTime) + queryResult.Duration = &dur + runEndTime.Synchronized(func(st *syncedTime) { + if queryResult.EndTime.After(st.t) { + st.t = *queryResult.EndTime + } + }) + } + + // Insert into MySQL if configured + if mysqlDb != nil { + if err := insertEventListenerData(ctx, mysqlDb, qce, queryId); err != nil { + log.Error().Err(err).Str("path", path).Msg("failed to insert event listener record") + return + } + } + + for _, r := range runRecorders { + r.RecordQuery(utils.GetCtxWithTimeout(time.Second*5), pseudoStage, queryResult) + } + + log.Info().Str("path", path).Str("query_id", queryId).Msg("success") + resultChan <- queryResult +} + +func processPath(ctx context.Context, path string) error { + utils.ExpandHomeDirectory(&path) + stat, err := os.Stat(path) + if err != nil { + return err + } + if !stat.IsDir() { + scheduleFile(ctx, path) + return nil + } + entries, err := os.ReadDir(path) + if err != nil { + return err + } + for _, entry := range entries { + if ctx.Err() != nil { + log.Info().Msg("abort task scheduling") + break + } + if entry.IsDir() { + continue + } + fullPath := filepath.Join(path, entry.Name()) + scheduleFile(ctx, fullPath) + } + return nil +} + +func registerRunRecorder(r stage.RunRecorder) { + if r == nil || reflect.ValueOf(r).IsNil() { + return + } + runRecorders = append(runRecorders, r) +} diff --git a/cmd/loadeljson/synced_time.go b/cmd/loadeljson/synced_time.go new file mode 100644 index 0000000..eef8bcd --- /dev/null +++ b/cmd/loadeljson/synced_time.go @@ -0,0 +1,29 @@ +package loadeljson + +import ( + "sync" + "time" +) + +type syncedTime struct { + t time.Time + m sync.Mutex +} + +func newSyncedTime(t time.Time) *syncedTime { + return &syncedTime{ + t: t, + } +} + +func (st *syncedTime) Synchronized(f func(st *syncedTime)) { + st.m.Lock() + defer st.m.Unlock() + f(st) +} + +func (st *syncedTime) GetTime() time.Time { + st.m.Lock() + defer st.m.Unlock() + return st.t +} diff --git a/cmd/loadeljson/types.go b/cmd/loadeljson/types.go new file mode 100644 index 0000000..399bcd5 --- /dev/null +++ b/cmd/loadeljson/types.go @@ -0,0 +1,327 @@ +package loadeljson + +import ( + "encoding/json" + "fmt" + "time" +) + +// PrestoTime is a custom time type that can unmarshal from both string and numeric formats +type PrestoTime struct { + time.Time +} + +// ResourceGroupId is a custom type that can handle both string and array formats +type ResourceGroupId struct { + Value string +} + +func (pt *PrestoTime) UnmarshalJSON(b []byte) error { + var v interface{} + if err := json.Unmarshal(b, &v); err != nil { + return err + } + + switch value := v.(type) { + case string: + // Try parsing as RFC3339 format + t, err := time.Parse(time.RFC3339, value) + if err != nil { + // Try parsing as RFC3339Nano format + t, err = time.Parse(time.RFC3339Nano, value) + if err != nil { + return err + } + } + pt.Time = t + return nil + case float64: + // Assume it's seconds since epoch (Unix timestamp) + pt.Time = time.Unix(int64(value), int64((value-float64(int64(value)))*1e9)) + return nil + default: + return fmt.Errorf("invalid time type: %T", value) + } +} + +func (rg *ResourceGroupId) UnmarshalJSON(b []byte) error { + var v interface{} + if err := json.Unmarshal(b, &v); err != nil { + return err + } + + switch value := v.(type) { + case string: + rg.Value = value + return nil + case []interface{}: + // If it's an array, join the elements with "." + parts := make([]string, 0, len(value)) + for _, item := range value { + if str, ok := item.(string); ok { + parts = append(parts, str) + } + } + if len(parts) > 0 { + rg.Value = parts[0] + for i := 1; i < len(parts); i++ { + rg.Value += "." + parts[i] + } + } + return nil + default: + return fmt.Errorf("invalid resource group id type: %T", value) + } +} + +// QueryEvent represents the event listener JSON structure +type QueryEvent struct { + InstanceId string `json:"instanceId"` + ClusterName string `json:"clusterName"` + QueryCreatedEvent *QueryCreatedEvent `json:"queryCreatedEvent,omitempty"` + QueryCompletedEvent *QueryCompletedEvent `json:"queryCompletedEvent,omitempty"` + SplitCompletedEvent interface{} `json:"splitCompletedEvent,omitempty"` + Plan string `json:"plan"` + CpuTimeMillis int64 `json:"cpuTimeMillis"` + RetriedCpuTimeMillis int64 `json:"retriedCpuTimeMillis"` + WallTimeMillis int64 `json:"wallTimeMillis"` + QueuedTimeMillis int64 `json:"queuedTimeMillis"` + AnalysisTimeMillis int64 `json:"analysisTimeMillis"` +} + +type QueryCreatedEvent struct { + CreateTime PrestoTime `json:"createTime"` + Context QueryContext `json:"context"` + Metadata QueryMetadata `json:"metadata"` +} + +type ResourceEstimates struct { + ExecutionTime *Duration `json:"executionTime,omitempty"` + CpuTime *Duration `json:"cpuTime,omitempty"` + PeakMemory *int64 `json:"peakMemory,omitempty"` + PeakTaskMemory *int64 `json:"peakTaskMemory,omitempty"` +} + +type QueryCompletedEvent struct { + Metadata QueryMetadata `json:"metadata"` + Statistics QueryStatistics `json:"statistics"` + Context QueryContext `json:"context"` + IoMetadata QueryIOMetadata `json:"ioMetadata"` + FailureInfo *QueryFailureInfo `json:"failureInfo,omitempty"` + Warnings []interface{} `json:"warnings"` + QueryType *string `json:"queryType,omitempty"` + FailedTasks []string `json:"failedTasks"` + CreateTime PrestoTime `json:"createTime"` + ExecutionStartTime PrestoTime `json:"executionStartTime"` + EndTime PrestoTime `json:"endTime"` + StageStatistics []StageStatistics `json:"stageStatistics"` + OperatorStatistics []OperatorStatistics `json:"operatorStatistics"` + PlanStatisticsRead []interface{} `json:"planStatisticsRead"` + PlanStatisticsWritten []interface{} `json:"planStatisticsWritten"` + PlanNodeHash interface{} `json:"planNodeHash"` + CanonicalPlan interface{} `json:"canonicalPlan"` + StatsEquivalentPlan *string `json:"statsEquivalentPlan,omitempty"` + ExpandedQuery *string `json:"expandedQuery,omitempty"` + OptimizerInformation []interface{} `json:"optimizerInformation"` + CteInformationList []interface{} `json:"cteInformationList"` + ScalarFunctions []string `json:"scalarFunctions"` + AggregateFunctions []string `json:"aggregateFunctions"` + WindowFunctions []string `json:"windowFunctions"` +} + +type QueryMetadata struct { + QueryId string `json:"queryId"` + TransactionId *string `json:"transactionId,omitempty"` + Query string `json:"query"` + QueryState string `json:"queryState"` + Uri string `json:"uri"` + Plan *string `json:"plan,omitempty"` + JsonPlan *string `json:"jsonPlan,omitempty"` + QueryHash *string `json:"queryHash,omitempty"` +} + +type QueryStatistics struct { + CpuTime Duration `json:"cpuTime"` + WallTime Duration `json:"wallTime"` + QueuedTime Duration `json:"queuedTime"` + AnalysisTime *Duration `json:"analysisTime,omitempty"` + PeakUserMemoryBytes int64 `json:"peakUserMemoryBytes"` + PeakTotalNonRevocableMemoryBytes int64 `json:"peakTotalNonRevocableMemoryBytes"` + PeakTaskUserMemory int64 `json:"peakTaskUserMemory"` + PeakTaskTotalMemory int64 `json:"peakTaskTotalMemory"` + PeakNodeTotalMemory int64 `json:"peakNodeTotalMemory"` + TotalBytes int64 `json:"totalBytes"` + TotalRows int64 `json:"totalRows"` + OutputPositions int64 `json:"outputPositions"` + OutputBytes int64 `json:"outputBytes"` + WrittenOutputRows int64 `json:"writtenOutputRows"` + WrittenOutputBytes int64 `json:"writtenOutputBytes"` + CumulativeMemory float64 `json:"cumulativeMemory"` + CumulativeTotalMemory float64 `json:"cumulativeTotalMemory"` + CompletedSplits int `json:"completedSplits"` +} + +type QueryContext struct { + User string `json:"user"` + Principal *string `json:"principal,omitempty"` + RemoteClientAddress *string `json:"remoteClientAddress,omitempty"` + UserAgent *string `json:"userAgent,omitempty"` + ClientInfo *string `json:"clientInfo,omitempty"` + Source *string `json:"source,omitempty"` + Catalog *string `json:"catalog,omitempty"` + Schema *string `json:"schema,omitempty"` + ResourceGroupId *ResourceGroupId `json:"resourceGroupId,omitempty"` + SessionProperties map[string]string `json:"sessionProperties"` + ServerVersion string `json:"serverVersion"` + Environment string `json:"environment"` + ClientTags []string `json:"clientTags"` + ResourceEstimates ResourceEstimates `json:"resourceEstimates"` +} + +type QueryIOMetadata struct { + Inputs []interface{} `json:"inputs"` + Output interface{} `json:"output,omitempty"` +} + +type QueryFailureInfo struct { + ErrorCode ErrorCode `json:"errorCode"` + FailureType *string `json:"failureType,omitempty"` + FailureMessage *string `json:"failureMessage,omitempty"` + FailureTask *string `json:"failureTask,omitempty"` + FailureHost *string `json:"failureHost,omitempty"` + FailuresJson string `json:"failuresJson"` +} + +type ErrorCode struct { + Code int `json:"code"` + Name string `json:"name"` + Type string `json:"type"` +} + +type StageStatistics struct { + StageId int `json:"stageId"` + StageExecutionId int `json:"stageExecutionId"` + Tasks int `json:"tasks"` + TotalScheduledTime Duration `json:"totalScheduledTime"` + TotalCpuTime Duration `json:"totalCpuTime"` + RetriedCpuTime Duration `json:"retriedCpuTime"` + TotalBlockedTime Duration `json:"totalBlockedTime"` + RawInputDataSize DataSize `json:"rawInputDataSize"` + ProcessedInputDataSize DataSize `json:"processedInputDataSize"` + PhysicalWrittenDataSize DataSize `json:"physicalWrittenDataSize"` +} + +type OperatorStatistics struct { + StageId int `json:"stageId"` + StageExecutionId int `json:"stageExecutionId"` + PipelineId int `json:"pipelineId"` + OperatorId int `json:"operatorId"` + PlanNodeId string `json:"planNodeId"` + OperatorType string `json:"operatorType"` + TotalDrivers int64 `json:"totalDrivers"` + AddInputCalls int64 `json:"addInputCalls"` + AddInputWall Duration `json:"addInputWall"` + AddInputCpu Duration `json:"addInputCpu"` + AddInputAllocation DataSize `json:"addInputAllocation"` + RawInputDataSize DataSize `json:"rawInputDataSize"` + RawInputPositions int64 `json:"rawInputPositions"` + InputDataSize DataSize `json:"inputDataSize"` + InputPositions int64 `json:"inputPositions"` + GetOutputCalls int64 `json:"getOutputCalls"` + GetOutputWall Duration `json:"getOutputWall"` + GetOutputCpu Duration `json:"getOutputCpu"` + GetOutputAllocation DataSize `json:"getOutputAllocation"` + OutputDataSize DataSize `json:"outputDataSize"` + OutputPositions int64 `json:"outputPositions"` + PhysicalWrittenDataSize DataSize `json:"physicalWrittenDataSize"` + BlockedWall Duration `json:"blockedWall"` + FinishCalls int64 `json:"finishCalls"` + FinishWall Duration `json:"finishWall"` + FinishCpu Duration `json:"finishCpu"` + FinishAllocation DataSize `json:"finishAllocation"` + UserMemoryReservation DataSize `json:"userMemoryReservation"` + RevocableMemoryReservation DataSize `json:"revocableMemoryReservation"` + SystemMemoryReservation DataSize `json:"systemMemoryReservation"` + PeakUserMemoryReservation DataSize `json:"peakUserMemoryReservation"` + PeakSystemMemoryReservation DataSize `json:"peakSystemMemoryReservation"` + PeakTotalMemoryReservation DataSize `json:"peakTotalMemoryReservation"` + SpilledDataSize DataSize `json:"spilledDataSize"` + Info *string `json:"info,omitempty"` +} + +// Duration represents a time duration in the format used by Presto +type Duration struct { + time.Duration +} + +func (d *Duration) UnmarshalJSON(b []byte) error { + var v interface{} + if err := json.Unmarshal(b, &v); err != nil { + return err + } + switch value := v.(type) { + case float64: + d.Duration = time.Duration(value) * time.Millisecond + return nil + case string: + // Parse Presto duration format: "43.99d", "1.5h", "30m", "45s", "100ms", etc. + var amount float64 + var unit string + n, err := fmt.Sscanf(value, "%f%s", &amount, &unit) + if err != nil || n != 2 { + // Try standard Go duration format as fallback + d.Duration, err = time.ParseDuration(value) + return err + } + + // Convert Presto units to Go duration + switch unit { + case "d": + d.Duration = time.Duration(amount * float64(24*time.Hour)) + case "h": + d.Duration = time.Duration(amount * float64(time.Hour)) + case "m": + d.Duration = time.Duration(amount * float64(time.Minute)) + case "s": + d.Duration = time.Duration(amount * float64(time.Second)) + case "ms": + d.Duration = time.Duration(amount * float64(time.Millisecond)) + case "us": + d.Duration = time.Duration(amount * float64(time.Microsecond)) + case "ns": + d.Duration = time.Duration(amount * float64(time.Nanosecond)) + default: + return fmt.Errorf("unknown duration unit: %s", unit) + } + return nil + default: + return fmt.Errorf("invalid duration type") + } +} + +// DataSize represents a data size in bytes +type DataSize struct { + Bytes int64 +} + +func (ds *DataSize) UnmarshalJSON(b []byte) error { + var v interface{} + if err := json.Unmarshal(b, &v); err != nil { + return err + } + switch value := v.(type) { + case float64: + ds.Bytes = int64(value) + return nil + case string: + // Parse string like "1.5MB", "100KB", etc. + // For simplicity, we'll just try to extract the number + var size float64 + var unit string + fmt.Sscanf(value, "%f%s", &size, &unit) + ds.Bytes = int64(size) + return nil + default: + return fmt.Errorf("invalid data size type") + } +} From 1da2e3b05e69270f31fc1ef5e96528d16736746e Mon Sep 17 00:00:00 2001 From: unidevel Date: Sat, 15 Nov 2025 11:58:48 +0000 Subject: [PATCH 2/5] Add ndjson option --- cmd/loadeljson.go | 1 + cmd/loadeljson/main.go | 89 ++++++++++++++++++++++++++++++++++++++---- 2 files changed, 82 insertions(+), 8 deletions(-) diff --git a/cmd/loadeljson.go b/cmd/loadeljson.go index c295a1f..d73507a 100644 --- a/cmd/loadeljson.go +++ b/cmd/loadeljson.go @@ -42,6 +42,7 @@ func init() { loadElJsonCmd.Flags().IntVarP(&loadeljson.Parallelism, "parallel", "P", runtime.NumCPU(), "Number of parallel threads to load json files") loadElJsonCmd.Flags().StringVar(&loadeljson.InfluxCfgPath, "influx", "", "InfluxDB connection config for run recorder (optional)") loadElJsonCmd.Flags().StringVar(&loadeljson.MySQLCfgPath, "mysql", "", "MySQL connection config for event listener and run recorder (optional)") + loadElJsonCmd.Flags().BoolVar(&loadeljson.IsNDJSON, "ndjson", false, "Process files as NDJSON (newline-delimited JSON) format") } // Made with Bob diff --git a/cmd/loadeljson/main.go b/cmd/loadeljson/main.go index b4c7186..d584369 100644 --- a/cmd/loadeljson/main.go +++ b/cmd/loadeljson/main.go @@ -1,6 +1,7 @@ package loadeljson import ( + "bufio" "context" "database/sql" "encoding/json" @@ -27,6 +28,7 @@ var ( MySQLCfgPath string InfluxCfgPath string Parallelism int + IsNDJSON bool runRecorders = make([]stage.RunRecorder, 0, 3) queryResults = make([]*stage.QueryResult, 0, 8) @@ -126,7 +128,11 @@ func Run(_ *cobra.Command, args []string) { func scheduleFile(ctx context.Context, path string) { parallelismGuard <- struct{}{} runningTasks.Add(1) - go processFile(ctx, path) + if IsNDJSON { + go processNDJSONFile(ctx, path) + } else { + go processFile(ctx, path) + } } func processFile(ctx context.Context, path string) { @@ -142,30 +148,89 @@ func processFile(ctx context.Context, path string) { return } + processJSONBytes(ctx, path, bytes, 0) +} + +func processNDJSONFile(ctx context.Context, path string) { + defer func() { + // Allow another task runner to start. + <-parallelismGuard + runningTasks.Done() + }() + + file, err := os.Open(path) + if err != nil { + log.Error().Err(err).Str("path", path).Msg("failed to open NDJSON file") + return + } + defer file.Close() + + scanner := bufio.NewScanner(file) + // Increase buffer size to handle large JSON lines (default is 64KB, we set to 10MB) + const maxCapacity = 10 * 1024 * 1024 // 10MB + buf := make([]byte, maxCapacity) + scanner.Buffer(buf, maxCapacity) + + lineNum := 0 + for scanner.Scan() { + lineNum++ + if ctx.Err() != nil { + log.Info().Str("path", path).Msg("abort processing NDJSON file") + break + } + line := scanner.Bytes() + if len(line) == 0 { + continue + } + processJSONBytes(ctx, path, line, lineNum) + } + + if err := scanner.Err(); err != nil { + log.Error().Err(err).Str("path", path).Msg("error reading NDJSON file") + } +} + +func processJSONBytes(ctx context.Context, path string, jsonBytes []byte, lineNum int) { queryEvent := new(QueryEvent) // Note that this step can succeed with any valid JSON file. But we need to do some additional validation to skip // invalid event listener JSON files. - if unmarshalErr := json.Unmarshal(bytes, queryEvent); unmarshalErr != nil { - log.Error().Err(unmarshalErr).Str("path", path).Msg("failed to unmarshal JSON") + if unmarshalErr := json.Unmarshal(jsonBytes, queryEvent); unmarshalErr != nil { + if lineNum > 0 { + log.Error().Err(unmarshalErr).Str("path", path).Int("line", lineNum).Msg("failed to unmarshal JSON") + } else { + log.Error().Err(unmarshalErr).Str("path", path).Msg("failed to unmarshal JSON") + } return } // Validate that this is a QueryCompletedEvent if queryEvent.QueryCompletedEvent == nil { - log.Error().Str("path", path).Msg("no QueryCompletedEvent found in file") + if lineNum > 0 { + log.Error().Str("path", path).Int("line", lineNum).Msg("no QueryCompletedEvent found") + } else { + log.Error().Str("path", path).Msg("no QueryCompletedEvent found in file") + } return } qce := queryEvent.QueryCompletedEvent if qce.Metadata.QueryId == "" || qce.CreateTime.Time.IsZero() { - log.Error().Str("path", path).Msg("invalid QueryCompletedEvent: missing queryId or createTime") + if lineNum > 0 { + log.Error().Str("path", path).Int("line", lineNum).Msg("invalid QueryCompletedEvent: missing queryId or createTime") + } else { + log.Error().Str("path", path).Msg("invalid QueryCompletedEvent: missing queryId or createTime") + } return } // Copy the Plan from QueryEvent to QueryMetadata qce.Metadata.Plan = &queryEvent.Plan - log.Info().Str("path", path).Msg("start to process event listener file") + if lineNum > 0 { + log.Info().Str("path", path).Int("line", lineNum).Msg("start to process event listener line") + } else { + log.Info().Str("path", path).Msg("start to process event listener file") + } queryId := qce.Metadata.QueryId if RecordRun { @@ -216,7 +281,11 @@ func processFile(ctx context.Context, path string) { // Insert into MySQL if configured if mysqlDb != nil { if err := insertEventListenerData(ctx, mysqlDb, qce, queryId); err != nil { - log.Error().Err(err).Str("path", path).Msg("failed to insert event listener record") + if lineNum > 0 { + log.Error().Err(err).Str("path", path).Int("line", lineNum).Msg("failed to insert event listener record") + } else { + log.Error().Err(err).Str("path", path).Msg("failed to insert event listener record") + } return } } @@ -225,7 +294,11 @@ func processFile(ctx context.Context, path string) { r.RecordQuery(utils.GetCtxWithTimeout(time.Second*5), pseudoStage, queryResult) } - log.Info().Str("path", path).Str("query_id", queryId).Msg("success") + if lineNum > 0 { + log.Info().Str("path", path).Int("line", lineNum).Str("query_id", queryId).Msg("success") + } else { + log.Info().Str("path", path).Str("query_id", queryId).Msg("success") + } resultChan <- queryResult } From 266161eb5370808e1522854e89c46d459c8a9055 Mon Sep 17 00:00:00 2001 From: unidevel Date: Sat, 15 Nov 2025 12:02:36 +0000 Subject: [PATCH 3/5] Use replace to update existing records --- cmd/loadeljson/db.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cmd/loadeljson/db.go b/cmd/loadeljson/db.go index e81c6ce..b0123ae 100644 --- a/cmd/loadeljson/db.go +++ b/cmd/loadeljson/db.go @@ -39,7 +39,7 @@ func insertEventListenerData(ctx context.Context, db *sql.DB, qce *QueryComplete } func insertQueryCreationInfo(ctx context.Context, db *sql.DB, qce *QueryCompletedEvent, queryId string) error { - query := `INSERT INTO presto_query_creation_info( + query := `REPLACE INTO presto_query_creation_info( query_id, query, create_time, schema_name, catalog_name, environment, user, remote_client_address, source, user_agent, uri, session_properties_json, server_version, client_info, resource_group_name, @@ -113,7 +113,7 @@ func insertQueryCreationInfo(ctx context.Context, db *sql.DB, qce *QueryComplete } func insertQueryPlans(ctx context.Context, db *sql.DB, qce *QueryCompletedEvent, queryId string) error { - query := `INSERT INTO presto_query_plans( + query := `REPLACE INTO presto_query_plans( query_id, query, plan, json_plan, environment, dt ) VALUES (?, ?, ?, ?, ?, ?)` @@ -139,7 +139,7 @@ func insertQueryStageStats(ctx context.Context, db *sql.DB, qce *QueryCompletedE return nil } - query := `INSERT INTO presto_query_stage_stats( + query := `REPLACE INTO presto_query_stage_stats( query_id, stage_id, stage_execution_id, tasks, total_scheduled_time_ms, total_cpu_time_ms, retried_cpu_time_ms, total_blocked_time_ms, raw_input_data_size_bytes, processed_input_data_size_bytes, physical_written_data_size_bytes, @@ -179,7 +179,7 @@ func insertQueryOperatorStats(ctx context.Context, db *sql.DB, qce *QueryComplet return nil } - query := `INSERT INTO presto_query_operator_stats( + query := `REPLACE INTO presto_query_operator_stats( query_id, stage_id, stage_execution_id, pipeline_id, operator_id, plan_node_id, operator_type, total_drivers, add_input_calls, add_input_wall_ms, add_input_cpu_ms, add_input_allocation_bytes, @@ -229,7 +229,7 @@ func insertQueryStatistics(ctx context.Context, db *sql.DB, qce *QueryCompletedE // Insert into presto_query_statistics table // This mirrors the Java implementation in PrestoQueryStatsDao.insertQueryStatistics - query := `INSERT INTO presto_query_statistics( + query := `REPLACE INTO presto_query_statistics( query_id, query, query_type, schema_name, catalog_name, environment, user, remote_client_address, source, user_agent, uri, session_properties_json, server_version, client_info, resource_group_name, From 69059439c572504af3280f1624856085879522bc Mon Sep 17 00:00:00 2001 From: unidevel Date: Sun, 16 Nov 2025 07:49:28 +0000 Subject: [PATCH 4/5] Enable recursive processing for el json files --- cmd/loadeljson/main.go | 44 ++++++++++++++++++++++++++++++------------ 1 file changed, 32 insertions(+), 12 deletions(-) diff --git a/cmd/loadeljson/main.go b/cmd/loadeljson/main.go index d584369..8bace4b 100644 --- a/cmd/loadeljson/main.go +++ b/cmd/loadeljson/main.go @@ -309,25 +309,45 @@ func processPath(ctx context.Context, path string) error { return err } if !stat.IsDir() { + // Skip dot files + if filepath.Base(path)[0] == '.' { + log.Debug().Str("path", path).Msg("skipping dot file") + return nil + } scheduleFile(ctx, path) return nil } - entries, err := os.ReadDir(path) - if err != nil { - return err - } - for _, entry := range entries { + + // Process directory recursively + return filepath.WalkDir(path, func(filePath string, d os.DirEntry, err error) error { + if err != nil { + log.Error().Err(err).Str("path", filePath).Msg("error walking directory") + return nil // Continue walking despite errors + } + if ctx.Err() != nil { log.Info().Msg("abort task scheduling") - break + return filepath.SkipAll } - if entry.IsDir() { - continue + + // Skip dot files and dot directories + name := d.Name() + if name[0] == '.' { + if d.IsDir() { + log.Debug().Str("path", filePath).Msg("skipping dot directory") + return filepath.SkipDir + } + log.Debug().Str("path", filePath).Msg("skipping dot file") + return nil } - fullPath := filepath.Join(path, entry.Name()) - scheduleFile(ctx, fullPath) - } - return nil + + // Only schedule regular files + if !d.IsDir() { + scheduleFile(ctx, filePath) + } + + return nil + }) } func registerRunRecorder(r stage.RunRecorder) { From 553b6290eb692b7bde16b13eab2c0001d3ad7185 Mon Sep 17 00:00:00 2001 From: unidevel Date: Tue, 18 Nov 2025 11:25:57 +0000 Subject: [PATCH 5/5] Replace pbench_runs and pbench_queries --- stage/mysql_run_recorder.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/stage/mysql_run_recorder.go b/stage/mysql_run_recorder.go index 7c56aca..f507829 100644 --- a/stage/mysql_run_recorder.go +++ b/stage/mysql_run_recorder.go @@ -4,9 +4,10 @@ import ( "context" "database/sql" _ "embed" - _ "github.com/go-sql-driver/mysql" "pbench/log" "pbench/utils" + + _ "github.com/go-sql-driver/mysql" ) var ( @@ -48,7 +49,7 @@ func NewMySQLRunRecorderWithDb(db *sql.DB) *MySQLRunRecorder { } func (m *MySQLRunRecorder) Start(_ context.Context, s *Stage) error { - recordNewRun := `INSERT INTO pbench_runs (run_name, cluster_fqdn, start_time, queries_ran, failed, mismatch, comment) + recordNewRun := `REPLACE INTO pbench_runs (run_name, cluster_fqdn, start_time, queries_ran, failed, mismatch, comment) VALUES (?, ?, ?, 0, 0, 0, ?)` res, err := m.db.Exec(recordNewRun, s.States.RunName, s.States.ServerFQDN, s.States.RunStartTime, s.States.Comment) if err != nil { @@ -64,7 +65,7 @@ VALUES (?, ?, ?, 0, 0, 0, ?)` } func (m *MySQLRunRecorder) RecordQuery(_ context.Context, s *Stage, result *QueryResult) { - recordNewQuery := `INSERT INTO pbench_queries (run_id, stage_id, query_file, query_index, query_id, sequence_no, + recordNewQuery := `REPLACE INTO pbench_queries (run_id, stage_id, query_file, query_index, query_id, sequence_no, cold_run, succeeded, start_time, end_time, row_count, expected_row_count, duration_ms, info_url) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` var queryFile string if result.Query.File != nil {