diff --git a/ch/workload.go b/ch/workload.go index b3ba64f..6b835c5 100644 --- a/ch/workload.go +++ b/ch/workload.go @@ -30,13 +30,14 @@ type analyzeConfig struct { // Config is the configuration for ch workload type Config struct { - Driver string - DBName string - RawQueries string - QueryNames []string - TiFlashReplica int - AnalyzeTable analyzeConfig - RefreshConnWait time.Duration + Driver string + DBName string + RawQueries string + QueryNames []string + TiFlashReplica int + AnalyzeTable analyzeConfig + ExecExplainAnalyze bool + RefreshConnWait time.Duration EnablePlanReplayer bool PlanReplayerConfig replayer.PlanReplayerConfig @@ -220,16 +221,38 @@ func (w *Workloader) Run(ctx context.Context, threadID int) error { w.dumpPlanReplayer(ctx, s, query, queryName) } + if w.cfg.ExecExplainAnalyze { + query = strings.Replace(query, "/*PLACEHOLDER*/", "explain analyze", 1) + } start := time.Now() rows, err := s.Conn.QueryContext(ctx, query) - w.measurement.Measure(queryName, time.Now().Sub(start), err) + defer w.measurement.Measure(queryName, time.Now().Sub(start), err) if err != nil { return fmt.Errorf("execute query %s failed %v", queryName, err) } defer rows.Close() + + if w.cfg.ExecExplainAnalyze { + table, err := util.RenderExplainAnalyze(rows) + if err != nil { + return err + } + util.StdErrLogger.Printf("explain analyze result of query %s (takes %s):\n%s\n", queryName, time.Now().Sub(start), table) + return nil + } + if err := w.drainQueryResult(queryName, rows); err != nil { + return fmt.Errorf("execute query %s failed %v", queryName, err) + } + return nil } +func (w *Workloader) drainQueryResult(queryName string, rows *sql.Rows) error { + for rows.Next() { + } + return rows.Err() +} + // Cleanup cleans up workloader func (w *Workloader) Cleanup(ctx context.Context, threadID int) error { return nil diff --git a/cmd/go-tpc/ch_benchmark.go b/cmd/go-tpc/ch_benchmark.go index 660fb85..a912041 100644 --- a/cmd/go-tpc/ch_benchmark.go +++ b/cmd/go-tpc/ch_benchmark.go @@ -102,6 +102,11 @@ func registerCHBenchmark(root *cobra.Command) { "", "Name of plan Replayer file dumps") + cmdRun.PersistentFlags().BoolVar(&chConfig.ExecExplainAnalyze, + "use-explain", + false, + "execute explain analyze") + cmdRun.PersistentFlags().IntSliceVar(&tpccConfig.Weight, "weight", []int{45, 43, 4, 4, 4}, "Weight for NewOrder, Payment, OrderStatus, Delivery, StockLevel") cmdRun.Flags().StringVar(&apConnParams, "ap-conn-params", "", "Connection parameters for analytical processing") cmdRun.Flags().StringSliceVar(&apHosts, "ap-host", nil, "Database host for analytical processing") diff --git a/cmd/go-tpc/rawsql.go b/cmd/go-tpc/rawsql.go index 418fb93..7d61ad1 100644 --- a/cmd/go-tpc/rawsql.go +++ b/cmd/go-tpc/rawsql.go @@ -9,6 +9,7 @@ import ( "strings" "time" + "github.com/pingcap/go-tpc/pkg/util" "github.com/pingcap/go-tpc/rawsql" "github.com/spf13/cobra" ) @@ -29,7 +30,7 @@ func registerRawsql(root *cobra.Command) { Short: "Run workload", Run: func(cmd *cobra.Command, args []string) { if len(queryFiles) == 0 { - fmt.Fprintln(os.Stderr, "empty query files") + util.StdErrLogger.Printf("empty query files") os.Exit(1) } @@ -74,7 +75,7 @@ func execRawsql(action string) { // if globalDB == nil if globalDB == nil { - fmt.Fprintln(os.Stderr, "cannot connect to the database") + util.StdErrLogger.Printf("cannot connect to the database") os.Exit(1) } @@ -89,7 +90,7 @@ func execRawsql(action string) { for i, filename := range rawsqlConfig.QueryNames { queryData, err := ioutil.ReadFile(filename) if err != nil { - fmt.Fprintf(os.Stderr, "read file: %s, err: %v\n", filename, err) + util.StdErrLogger.Printf("read file: %s, err: %v\n", filename, err) os.Exit(1) } diff --git a/cmd/go-tpc/tpch.go b/cmd/go-tpc/tpch.go index f35a678..89db3a8 100644 --- a/cmd/go-tpc/tpch.go +++ b/cmd/go-tpc/tpch.go @@ -6,6 +6,7 @@ import ( "os" "strings" + "github.com/pingcap/go-tpc/pkg/util" "github.com/pingcap/go-tpc/tpch" "github.com/spf13/cobra" ) @@ -17,7 +18,7 @@ func executeTpch(action string) { defer closeDB() if globalDB == nil { - fmt.Fprintln(os.Stderr, "cannot connect to the database") + util.StdErrLogger.Printf("cannot connect to the database") os.Exit(1) } diff --git a/pkg/util/output.go b/pkg/util/output.go index 0295426..8f214db 100644 --- a/pkg/util/output.go +++ b/pkg/util/output.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/json" "fmt" + "log" "os" "strings" @@ -17,6 +18,13 @@ const ( OutputStyleJson = "json" ) +// This logger is goroutine-safe. +var StdErrLogger *log.Logger + +func init() { + StdErrLogger = log.New(os.Stderr, "", 0) +} + func RenderString(format string, headers []string, values [][]string) { if len(values) == 0 { return diff --git a/rawsql/workload.go b/rawsql/workload.go index efedd1a..c430e1d 100644 --- a/rawsql/workload.go +++ b/rawsql/workload.go @@ -4,7 +4,6 @@ import ( "context" "database/sql" "fmt" - "os" "sort" "strings" "time" @@ -122,7 +121,7 @@ func (w *Workloader) Run(ctx context.Context, threadID int) error { if err != nil { return err } - fmt.Fprintf(os.Stderr, "explain analyze result of query %s:\n%s\n", queryName, table) + util.StdErrLogger.Printf("explain analyze result of query %s:\n%s\n", queryName, table) return nil } diff --git a/tpch/workload.go b/tpch/workload.go index cc70894..904ef57 100644 --- a/tpch/workload.go +++ b/tpch/workload.go @@ -219,22 +219,18 @@ func (w *Workloader) Run(ctx context.Context, threadID int) error { } start := time.Now() rows, err := s.Conn.QueryContext(ctx, query) - if err != nil { - return fmt.Errorf("execute query %s failed %v", query, err) - } - defer rows.Close() - w.measurement.Measure(queryName, time.Now().Sub(start), err) - + defer w.measurement.Measure(queryName, time.Now().Sub(start), err) if err != nil { return fmt.Errorf("execute %s failed %v", queryName, err) } + defer rows.Close() if w.cfg.ExecExplainAnalyze { table, err := util.RenderExplainAnalyze(rows) if err != nil { return err } - fmt.Fprintf(os.Stderr, "explain analyze result of query %s:\n%s\n", queryName, table) + util.StdErrLogger.Printf("explain analyze result of query %s (takes %s):\n%s\n", queryName, time.Now().Sub(start), table) return nil } if err := w.scanQueryResult(queryName, rows); err != nil {