From 956083c8a01bd3ae305237f7d4c6e7880c0fdc64 Mon Sep 17 00:00:00 2001 From: Song Gao Date: Thu, 18 Aug 2022 15:27:08 +0800 Subject: [PATCH] support plan replayer for tpch (#131) * support plan replayer Signed-off-by: yisaer support plan replayer Signed-off-by: yisaer support plan replayer Signed-off-by: yisaer support plan replayer Signed-off-by: yisaer support plan replayer Signed-off-by: yisaer support plan replayer Signed-off-by: yisaer support plan replayer Signed-off-by: yisaer support plan replayer Signed-off-by: yisaer support plan replayer Signed-off-by: yisaer support plan replayer Signed-off-by: yisaer support plan replayer Signed-off-by: yisaer * try new test Signed-off-by: yisaer Signed-off-by: yisaer --- .gitignore | 1 + ch/workload.go | 12 ++++ cmd/go-tpc/main.go | 2 + cmd/go-tpc/misc.go | 15 ++++ cmd/go-tpc/tpch.go | 22 +++++- pkg/workload/workload.go | 4 ++ rawsql/workload.go | 12 ++++ tpcc/csv.go | 12 ++++ tpcc/workload.go | 12 ++++ tpch/check.go | 2 +- tpch/ddl.go | 2 +- tpch/workload.go | 149 +++++++++++++++++++++++++++++++++++---- 12 files changed, 228 insertions(+), 17 deletions(-) diff --git a/.gitignore b/.gitignore index 83e9f8f..59d09b7 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ bin dist/ .vscode/ .DS_Store +vendor/ diff --git a/ch/workload.go b/ch/workload.go index 9bfbfb8..0604717 100644 --- a/ch/workload.go +++ b/ch/workload.go @@ -278,3 +278,15 @@ func (w Workloader) OutputStats(ifSummaryReport bool) { func (w Workloader) DBName() string { return w.cfg.DBName } + +func (w Workloader) IsPlanReplayerDumpEnabled() bool { + return false +} + +func (w Workloader) PreparePlanReplayerDump() error { + return nil +} + +func (w Workloader) FinishPlanReplayerDump() error { + return nil +} diff --git a/cmd/go-tpc/main.go b/cmd/go-tpc/main.go index d52c905..c0a79e1 100644 --- a/cmd/go-tpc/main.go +++ b/cmd/go-tpc/main.go @@ -21,6 +21,7 @@ var ( dbName string host string port int + statusPort int user string password string threads int @@ -102,6 +103,7 @@ func main() { rootCmd.PersistentFlags().StringVarP(&user, "user", "U", "root", "Database user") rootCmd.PersistentFlags().StringVarP(&password, "password", "p", "", "Database password") rootCmd.PersistentFlags().IntVarP(&port, "port", "P", 4000, "Database port") + rootCmd.PersistentFlags().IntVarP(&statusPort, "statusPort", "S", 10080, "Database status port") rootCmd.PersistentFlags().IntVarP(&threads, "threads", "T", 1, "Thread concurrency") rootCmd.PersistentFlags().IntVarP(&acThreads, "acThreads", "t", 1, "OLAP client concurrency, only for CH-benCHmark") rootCmd.PersistentFlags().StringVarP(&driver, "driver", "d", "", "Database driver: mysql") diff --git a/cmd/go-tpc/misc.go b/cmd/go-tpc/misc.go index 2da90d8..8afd5f5 100644 --- a/cmd/go-tpc/misc.go +++ b/cmd/go-tpc/misc.go @@ -58,6 +58,21 @@ func execute(ctx context.Context, w workload.Workloader, action string, threads, return w.Check(ctx, index) } + enabledDumpPlanReplayer := w.IsPlanReplayerDumpEnabled() + if enabledDumpPlanReplayer { + err := w.PreparePlanReplayerDump() + if err != nil { + return err + } + defer func() { + err := w.FinishPlanReplayerDump() + if err != nil { + fmt.Printf("[%s] dump plan replayer failed, err%v\n", + time.Now().Format("2006-01-02 15:04:05"), err) + } + }() + } + for i := 0; i < count || count <= 0; i++ { err := w.Run(ctx, index) diff --git a/cmd/go-tpc/tpch.go b/cmd/go-tpc/tpch.go index 467f799..67ecf40 100644 --- a/cmd/go-tpc/tpch.go +++ b/cmd/go-tpc/tpch.go @@ -6,9 +6,8 @@ import ( "os" "strings" - "github.com/spf13/cobra" - "github.com/pingcap/go-tpc/tpch" + "github.com/spf13/cobra" ) var tpchConfig tpch.Config @@ -22,12 +21,14 @@ func executeTpch(action string) { os.Exit(1) } + tpchConfig.Host = host + tpchConfig.StatusPort = statusPort + tpchConfig.OutputStyle = outputStyle tpchConfig.DBName = dbName tpchConfig.PrepareThreads = threads tpchConfig.QueryNames = strings.Split(tpchConfig.RawQueries, ",") w := tpch.NewWorkloader(globalDB, &tpchConfig) - timeoutCtx, cancel := context.WithTimeout(globalCtx, totalTime) defer cancel() @@ -61,6 +62,21 @@ func registerTpch(root *cobra.Command) { false, "Check output data, only when the scale factor equals 1") + cmd.PersistentFlags().BoolVar(&tpchConfig.EnablePlanReplayer, + "use-plan-replayer", + false, + "Use Plan Replayer to dump stats and variables before running queries") + + cmd.PersistentFlags().StringVar(&tpchConfig.PlanReplayerDir, + "plan-replayer-dir", + "", + "Dir of Plan Replayer file dumps") + + cmd.PersistentFlags().StringVar(&tpchConfig.PlanReplayerFileName, + "plan-replayer-file", + "", + "Name of plan Replayer file dumps") + var cmdPrepare = &cobra.Command{ Use: "prepare", Short: "Prepare data for the workload", diff --git a/pkg/workload/workload.go b/pkg/workload/workload.go index 7fe577a..d39dda1 100644 --- a/pkg/workload/workload.go +++ b/pkg/workload/workload.go @@ -16,4 +16,8 @@ type Workloader interface { Check(ctx context.Context, threadID int) error OutputStats(ifSummaryReport bool) DBName() string + + IsPlanReplayerDumpEnabled() bool + PreparePlanReplayerDump() error + FinishPlanReplayerDump() error } diff --git a/rawsql/workload.go b/rawsql/workload.go index cb0c583..e7e4f3d 100644 --- a/rawsql/workload.go +++ b/rawsql/workload.go @@ -171,3 +171,15 @@ func (w *Workloader) Cleanup(ctx context.Context, threadID int) error { func (w *Workloader) Check(ctx context.Context, threadID int) error { panic("not implemented") // TODO: Implement } + +func (w Workloader) IsPlanReplayerDumpEnabled() bool { + return false +} + +func (w Workloader) PreparePlanReplayerDump() error { + return nil +} + +func (w Workloader) FinishPlanReplayerDump() error { + return nil +} diff --git a/tpcc/csv.go b/tpcc/csv.go index fedd607..06dfb0d 100644 --- a/tpcc/csv.go +++ b/tpcc/csv.go @@ -487,3 +487,15 @@ func (c *CSVWorkLoader) loadOrderLine(ctx context.Context, warehouse int, return l.Flush(ctx) } + +func (c *CSVWorkLoader) IsPlanReplayerDumpEnabled() bool { + return false +} + +func (c *CSVWorkLoader) PreparePlanReplayerDump() error { + return nil +} + +func (c *CSVWorkLoader) FinishPlanReplayerDump() error { + return nil +} diff --git a/tpcc/workload.go b/tpcc/workload.go index 02ce9b6..c3624af 100644 --- a/tpcc/workload.go +++ b/tpcc/workload.go @@ -472,3 +472,15 @@ func closeStmts(stmts map[string]*sql.Stmt) { stmt.Close() } } + +func (w *Workloader) IsPlanReplayerDumpEnabled() bool { + return false +} + +func (w *Workloader) PreparePlanReplayerDump() error { + return nil +} + +func (w *Workloader) FinishPlanReplayerDump() error { + return nil +} diff --git a/tpch/check.go b/tpch/check.go index 067144a..f3dffab 100644 --- a/tpch/check.go +++ b/tpch/check.go @@ -51,7 +51,7 @@ var queryColPrecisions = map[string][]precision{ "q22": {num, cnt, sum}, } -func (w Workloader) scanQueryResult(queryName string, rows *sql.Rows) error { +func (w *Workloader) scanQueryResult(queryName string, rows *sql.Rows) error { var got [][]string cols, err := rows.Columns() diff --git a/tpch/ddl.go b/tpch/ddl.go index 3709118..22b1a12 100644 --- a/tpch/ddl.go +++ b/tpch/ddl.go @@ -28,7 +28,7 @@ func (w *Workloader) createTableDDL(ctx context.Context, query string, tableName } // createTables creates tables schema. -func (w Workloader) createTables(ctx context.Context) error { +func (w *Workloader) createTables(ctx context.Context) error { query := ` CREATE TABLE IF NOT EXISTS nation ( N_NATIONKEY BIGINT NOT NULL, diff --git a/tpch/workload.go b/tpch/workload.go index 64ce64c..95157e0 100644 --- a/tpch/workload.go +++ b/tpch/workload.go @@ -1,13 +1,20 @@ package tpch import ( + "archive/zip" "context" "database/sql" + "encoding/base64" "fmt" + "io/ioutil" + "math/rand" + "net/http" "os" "path" + "path/filepath" "sort" "strings" + "sync" "time" "github.com/pingcap/go-tpc/pkg/measurement" @@ -39,6 +46,12 @@ type Config struct { AnalyzeTable analyzeConfig ExecExplainAnalyze bool PrepareThreads int + Host string + StatusPort int + + EnablePlanReplayer bool + PlanReplayerDir string + PlanReplayerFileName string // for prepare command only OutputType string @@ -60,11 +73,17 @@ type Workloader struct { // stats measurement *measurement.Measurement + + zf *os.File + zw struct { + sync.Mutex + writer *zip.Writer + } } // NewWorkloader new work loader func NewWorkloader(db *sql.DB, cfg *Config) workload.Workloader { - return Workloader{ + return &Workloader{ db: db, cfg: cfg, measurement: measurement.NewMeasurement(func(m *measurement.Measurement) { @@ -86,12 +105,12 @@ func (w *Workloader) updateState(ctx context.Context) { } // Name return workloader name -func (w Workloader) Name() string { +func (w *Workloader) Name() string { return "tpch" } // InitThread inits thread -func (w Workloader) InitThread(ctx context.Context, threadID int) context.Context { +func (w *Workloader) InitThread(ctx context.Context, threadID int) context.Context { s := &tpchState{ queryIdx: threadID % len(w.cfg.QueryNames), TpcState: workload.NewTpcState(ctx, w.db), @@ -102,13 +121,13 @@ func (w Workloader) InitThread(ctx context.Context, threadID int) context.Contex } // CleanupThread cleans up thread -func (w Workloader) CleanupThread(ctx context.Context, threadID int) { +func (w *Workloader) CleanupThread(ctx context.Context, threadID int) { s := w.getState(ctx) s.Conn.Close() } // Prepare prepares data -func (w Workloader) Prepare(ctx context.Context, threadID int) error { +func (w *Workloader) Prepare(ctx context.Context, threadID int) error { if threadID != 0 { return nil } @@ -163,7 +182,7 @@ func (w Workloader) Prepare(ctx context.Context, threadID int) error { return nil } -func (w Workloader) analyzeTables(ctx context.Context, acfg analyzeConfig) error { +func (w *Workloader) analyzeTables(ctx context.Context, acfg analyzeConfig) error { s := w.getState(ctx) for _, tbl := range allTables { fmt.Printf("analyzing table %s\n", tbl) @@ -176,17 +195,24 @@ func (w Workloader) analyzeTables(ctx context.Context, acfg analyzeConfig) error } // CheckPrepare checks prepare -func (w Workloader) CheckPrepare(ctx context.Context, threadID int) error { +func (w *Workloader) CheckPrepare(ctx context.Context, threadID int) error { return nil } // Run runs workload -func (w Workloader) Run(ctx context.Context, threadID int) error { +func (w *Workloader) Run(ctx context.Context, threadID int) error { s := w.getState(ctx) defer w.updateState(ctx) queryName := w.cfg.QueryNames[s.queryIdx%len(w.cfg.QueryNames)] query := queries[queryName] + if w.cfg.EnablePlanReplayer { + err := w.dumpPlanReplayer(ctx, s, query, queryName) + if err != nil { + return err + } + } + if w.cfg.ExecExplainAnalyze { query = strings.Replace(query, "/*PLACEHOLDER*/", "explain analyze", 1) } @@ -217,7 +243,7 @@ func (w Workloader) Run(ctx context.Context, threadID int) error { } // Cleanup cleans up workloader -func (w Workloader) Cleanup(ctx context.Context, threadID int) error { +func (w *Workloader) Cleanup(ctx context.Context, threadID int) error { if threadID != 0 { return nil } @@ -225,7 +251,7 @@ func (w Workloader) Cleanup(ctx context.Context, threadID int) error { } // Check checks data -func (w Workloader) Check(ctx context.Context, threadID int) error { +func (w *Workloader) Check(ctx context.Context, threadID int) error { return nil } @@ -255,11 +281,110 @@ func outputRtMeasurement(outputStyle string, prefix string, opMeasurement map[st } } -func (w Workloader) OutputStats(ifSummaryReport bool) { +func (w *Workloader) OutputStats(ifSummaryReport bool) { w.measurement.Output(ifSummaryReport, w.cfg.OutputStyle, outputRtMeasurement) } // DBName returns the name of test db. -func (w Workloader) DBName() string { +func (w *Workloader) DBName() string { return w.cfg.DBName } + +func (w *Workloader) dumpPlanReplayer(ctx context.Context, s *tpchState, query, queryName string) error { + query = strings.Replace(query, "/*PLACEHOLDER*/", "plan replayer dump explain", 1) + rows, err := s.Conn.QueryContext(ctx, query) + if err != nil { + return fmt.Errorf("execute query %s failed %v", query, err) + } + defer rows.Close() + var token string + for rows.Next() { + err := rows.Scan(&token) + if err != nil { + return fmt.Errorf("execute query %s failed %v", query, err) + } + } + // TODO: support tls + r, err := http.Get(fmt.Sprintf("http://%s:%v/plan_replayer/dump/%s", w.cfg.Host, w.cfg.StatusPort, token)) + if err != nil { + return fmt.Errorf("get plan replayer for query %s failed %v", queryName, err) + } + defer r.Body.Close() + b, err := ioutil.ReadAll(r.Body) + if err != nil { + return fmt.Errorf("get plan replayer for query %s failed %v", queryName, err) + } + err = w.writeDataIntoZW(b, queryName) + if err != nil { + return fmt.Errorf("dump plan replayer for %s failed %v", queryName, err) + } + return nil +} + +func (w *Workloader) IsPlanReplayerDumpEnabled() bool { + return w.cfg.EnablePlanReplayer +} + +func (w *Workloader) PreparePlanReplayerDump() error { + if w.cfg.PlanReplayerDir == "" { + dir, err := os.Getwd() + if err != nil { + return err + } + w.cfg.PlanReplayerDir = dir + } + if w.cfg.PlanReplayerFileName == "" { + w.cfg.PlanReplayerFileName = fmt.Sprintf("plan_replayer_%s_%s", + w.Name(), time.Now().Format("2006-01-02-15:04:05")) + } + + fileName := fmt.Sprintf("%s.zip", w.cfg.PlanReplayerFileName) + zf, err := os.Create(filepath.Join(w.cfg.PlanReplayerDir, fileName)) + if err != nil { + return err + } + w.zf = zf + // Create zip writer + w.zw.writer = zip.NewWriter(zf) + return nil +} + +func (w *Workloader) FinishPlanReplayerDump() error { + w.zw.Lock() + err := w.zw.writer.Close() + if err != nil { + return err + } + w.zw.Unlock() + + return w.zf.Close() +} + +// writeDataIntoZW will dump query stats information by following format in zip +/* + |-q1_time.zip + |-q2_time.zip + |-q3_time.zip + |-... +*/ +func (w *Workloader) writeDataIntoZW(b []byte, queryName string) error { + k := make([]byte, 16) + //nolint: gosec + _, err := rand.Read(k) + if err != nil { + return err + } + key := base64.URLEncoding.EncodeToString(k) + w.zw.Lock() + defer w.zw.Unlock() + wr, err := w.zw.writer.Create(fmt.Sprintf("%v_%v_%v.zip", + queryName, time.Now().Format("2006-01-02-15:04:05"), key)) + if err != nil { + return err + } + _, err = wr.Write(b) + if err != nil { + return err + } + return nil +}