Skip to content

Commit

Permalink
Add explain support for CHbenchmark (#164)
Browse files Browse the repository at this point in the history
Signed-off-by: Wish <[email protected]>
  • Loading branch information
breezewish authored Mar 29, 2023
1 parent 4ed5a5f commit 0c29c49
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 21 deletions.
39 changes: 31 additions & 8 deletions ch/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@ type analyzeConfig struct {

// Config is the configuration for ch workload
type Config struct {
Driver string
DBName string
RawQueries string
QueryNames []string
TiFlashReplica int
AnalyzeTable analyzeConfig
RefreshConnWait time.Duration
Driver string
DBName string
RawQueries string
QueryNames []string
TiFlashReplica int
AnalyzeTable analyzeConfig
ExecExplainAnalyze bool
RefreshConnWait time.Duration

EnablePlanReplayer bool
PlanReplayerConfig replayer.PlanReplayerConfig
Expand Down Expand Up @@ -220,16 +221,38 @@ func (w *Workloader) Run(ctx context.Context, threadID int) error {
w.dumpPlanReplayer(ctx, s, query, queryName)
}

if w.cfg.ExecExplainAnalyze {
query = strings.Replace(query, "/*PLACEHOLDER*/", "explain analyze", 1)
}
start := time.Now()
rows, err := s.Conn.QueryContext(ctx, query)
w.measurement.Measure(queryName, time.Now().Sub(start), err)
defer w.measurement.Measure(queryName, time.Now().Sub(start), err)
if err != nil {
return fmt.Errorf("execute query %s failed %v", queryName, err)
}
defer rows.Close()

if w.cfg.ExecExplainAnalyze {
table, err := util.RenderExplainAnalyze(rows)
if err != nil {
return err
}
util.StdErrLogger.Printf("explain analyze result of query %s (takes %s):\n%s\n", queryName, time.Now().Sub(start), table)
return nil
}
if err := w.drainQueryResult(queryName, rows); err != nil {
return fmt.Errorf("execute query %s failed %v", queryName, err)
}

return nil
}

func (w *Workloader) drainQueryResult(queryName string, rows *sql.Rows) error {
for rows.Next() {
}
return rows.Err()
}

// Cleanup cleans up workloader
func (w *Workloader) Cleanup(ctx context.Context, threadID int) error {
return nil
Expand Down
5 changes: 5 additions & 0 deletions cmd/go-tpc/ch_benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ func registerCHBenchmark(root *cobra.Command) {
"",
"Name of plan Replayer file dumps")

cmdRun.PersistentFlags().BoolVar(&chConfig.ExecExplainAnalyze,
"use-explain",
false,
"execute explain analyze")

cmdRun.PersistentFlags().IntSliceVar(&tpccConfig.Weight, "weight", []int{45, 43, 4, 4, 4}, "Weight for NewOrder, Payment, OrderStatus, Delivery, StockLevel")
cmdRun.Flags().StringVar(&apConnParams, "ap-conn-params", "", "Connection parameters for analytical processing")
cmdRun.Flags().StringSliceVar(&apHosts, "ap-host", nil, "Database host for analytical processing")
Expand Down
7 changes: 4 additions & 3 deletions cmd/go-tpc/rawsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"
"time"

"github.com/pingcap/go-tpc/pkg/util"
"github.com/pingcap/go-tpc/rawsql"
"github.com/spf13/cobra"
)
Expand All @@ -29,7 +30,7 @@ func registerRawsql(root *cobra.Command) {
Short: "Run workload",
Run: func(cmd *cobra.Command, args []string) {
if len(queryFiles) == 0 {
fmt.Fprintln(os.Stderr, "empty query files")
util.StdErrLogger.Printf("empty query files")
os.Exit(1)
}

Expand Down Expand Up @@ -74,7 +75,7 @@ func execRawsql(action string) {

// if globalDB == nil
if globalDB == nil {
fmt.Fprintln(os.Stderr, "cannot connect to the database")
util.StdErrLogger.Printf("cannot connect to the database")
os.Exit(1)
}

Expand All @@ -89,7 +90,7 @@ func execRawsql(action string) {
for i, filename := range rawsqlConfig.QueryNames {
queryData, err := ioutil.ReadFile(filename)
if err != nil {
fmt.Fprintf(os.Stderr, "read file: %s, err: %v\n", filename, err)
util.StdErrLogger.Printf("read file: %s, err: %v\n", filename, err)
os.Exit(1)
}

Expand Down
3 changes: 2 additions & 1 deletion cmd/go-tpc/tpch.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"strings"

"github.com/pingcap/go-tpc/pkg/util"
"github.com/pingcap/go-tpc/tpch"
"github.com/spf13/cobra"
)
Expand All @@ -17,7 +18,7 @@ func executeTpch(action string) {
defer closeDB()

if globalDB == nil {
fmt.Fprintln(os.Stderr, "cannot connect to the database")
util.StdErrLogger.Printf("cannot connect to the database")
os.Exit(1)
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/util/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"encoding/json"
"fmt"
"log"
"os"
"strings"

Expand All @@ -17,6 +18,13 @@ const (
OutputStyleJson = "json"
)

// This logger is goroutine-safe.
var StdErrLogger *log.Logger

func init() {
StdErrLogger = log.New(os.Stderr, "", 0)
}

func RenderString(format string, headers []string, values [][]string) {
if len(values) == 0 {
return
Expand Down
3 changes: 1 addition & 2 deletions rawsql/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"database/sql"
"fmt"
"os"
"sort"
"strings"
"time"
Expand Down Expand Up @@ -122,7 +121,7 @@ func (w *Workloader) Run(ctx context.Context, threadID int) error {
if err != nil {
return err
}
fmt.Fprintf(os.Stderr, "explain analyze result of query %s:\n%s\n", queryName, table)
util.StdErrLogger.Printf("explain analyze result of query %s:\n%s\n", queryName, table)
return nil
}

Expand Down
10 changes: 3 additions & 7 deletions tpch/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,22 +219,18 @@ func (w *Workloader) Run(ctx context.Context, threadID int) error {
}
start := time.Now()
rows, err := s.Conn.QueryContext(ctx, query)
if err != nil {
return fmt.Errorf("execute query %s failed %v", query, err)
}
defer rows.Close()
w.measurement.Measure(queryName, time.Now().Sub(start), err)

defer w.measurement.Measure(queryName, time.Now().Sub(start), err)
if err != nil {
return fmt.Errorf("execute %s failed %v", queryName, err)
}
defer rows.Close()

if w.cfg.ExecExplainAnalyze {
table, err := util.RenderExplainAnalyze(rows)
if err != nil {
return err
}
fmt.Fprintf(os.Stderr, "explain analyze result of query %s:\n%s\n", queryName, table)
util.StdErrLogger.Printf("explain analyze result of query %s (takes %s):\n%s\n", queryName, time.Now().Sub(start), table)
return nil
}
if err := w.scanQueryResult(queryName, rows); err != nil {
Expand Down

0 comments on commit 0c29c49

Please sign in to comment.