From 4ed5a5fc3f524e3fecd8c8b54cd646d752e4719f Mon Sep 17 00:00:00 2001 From: Song Gao Date: Tue, 28 Feb 2023 15:28:05 +0800 Subject: [PATCH] fix (#163) --- ch/workload.go | 4 +++ cmd/go-tpc/misc.go | 62 +++++++++++++++++++++-------------- pkg/plan-replayer/replayer.go | 28 +++++++++++----- pkg/workload/workload.go | 1 + rawsql/workload.go | 4 +++ tpcc/csv.go | 4 +++ tpcc/workload.go | 4 +++ tpch/workload.go | 29 +++++----------- 8 files changed, 83 insertions(+), 53 deletions(-) diff --git a/ch/workload.go b/ch/workload.go index c52ba00..b3ba64f 100644 --- a/ch/workload.go +++ b/ch/workload.go @@ -327,3 +327,7 @@ func (w *Workloader) PreparePlanReplayerDump() error { func (w *Workloader) FinishPlanReplayerDump() error { return w.PlanReplayerRunner.Finish() } + +func (w *Workloader) Exec(sql string) error { + return nil +} diff --git a/cmd/go-tpc/misc.go b/cmd/go-tpc/misc.go index 8afd5f5..3cf04a9 100644 --- a/cmd/go-tpc/misc.go +++ b/cmd/go-tpc/misc.go @@ -37,10 +37,10 @@ func checkPrepare(ctx context.Context, w workload.Workloader) { wg.Wait() } -func execute(ctx context.Context, w workload.Workloader, action string, threads, index int) error { +func execute(timeoutCtx context.Context, w workload.Workloader, action string, threads, index int) error { count := totalCount / threads - ctx = w.InitThread(ctx, index) + ctx := w.InitThread(context.Background(), index) defer w.CleanupThread(ctx, index) switch action { @@ -58,30 +58,8 @@ func execute(ctx context.Context, w workload.Workloader, action string, threads, return w.Check(ctx, index) } - enabledDumpPlanReplayer := w.IsPlanReplayerDumpEnabled() - if enabledDumpPlanReplayer { - err := w.PreparePlanReplayerDump() - if err != nil { - return err - } - defer func() { - err := w.FinishPlanReplayerDump() - if err != nil { - fmt.Printf("[%s] dump plan replayer failed, err%v\n", - time.Now().Format("2006-01-02 15:04:05"), err) - } - }() - } - for i := 0; i < count || count <= 0; i++ { err := w.Run(ctx, index) - - select { - case <-ctx.Done(): - return nil - default: - } - if err != nil { if !silence { fmt.Printf("[%s] execute %s failed, err %v\n", time.Now().Format("2006-01-02 15:04:05"), action, err) @@ -90,6 +68,11 @@ func execute(ctx context.Context, w workload.Workloader, action string, threads, return err } } + select { + case <-timeoutCtx.Done(): + return nil + default: + } } return nil @@ -115,6 +98,37 @@ func executeWorkload(ctx context.Context, w workload.Workloader, threads int, ac } } }() + if w.Name() == "tpch" && action == "run" { + err := w.Exec(`create or replace view revenue0 (supplier_no, total_revenue) as + select + l_suppkey, + sum(l_extendedprice * (1 - l_discount)) + from + lineitem + where + l_shipdate >= '1997-07-01' + and l_shipdate < date_add('1997-07-01', interval '3' month) + group by + l_suppkey;`) + if err != nil { + panic(fmt.Sprintf("a fatal occurred when preparing view data: %v", err)) + } + } + enabledDumpPlanReplayer := w.IsPlanReplayerDumpEnabled() + if enabledDumpPlanReplayer { + err := w.PreparePlanReplayerDump() + if err != nil { + fmt.Printf("[%s] prepare plan replayer failed, err%v\n", + time.Now().Format("2006-01-02 15:04:05"), err) + } + defer func() { + err = w.FinishPlanReplayerDump() + if err != nil { + fmt.Printf("[%s] dump plan replayer failed, err%v\n", + time.Now().Format("2006-01-02 15:04:05"), err) + } + }() + } for i := 0; i < threads; i++ { go func(index int) { diff --git a/pkg/plan-replayer/replayer.go b/pkg/plan-replayer/replayer.go index 4b536e8..c512ef1 100644 --- a/pkg/plan-replayer/replayer.go +++ b/pkg/plan-replayer/replayer.go @@ -24,15 +24,22 @@ type PlanReplayerConfig struct { } type PlanReplayerRunner struct { - Config PlanReplayerConfig - zf *os.File - zw struct { - sync.Mutex + sync.Mutex + prepared bool + finished bool + Config PlanReplayerConfig + zf *os.File + zw struct { writer *zip.Writer } } func (r *PlanReplayerRunner) Prepare() error { + r.Lock() + defer r.Unlock() + if r.prepared { + return nil + } if r.Config.PlanReplayerDir == "" { dir, err := os.Getwd() if err != nil { @@ -53,20 +60,27 @@ func (r *PlanReplayerRunner) Prepare() error { r.zf = zf // Create zip writer r.zw.writer = zip.NewWriter(zf) + r.prepared = true return nil } func (r *PlanReplayerRunner) Finish() error { - r.zw.Lock() + r.Lock() + defer r.Unlock() + if r.finished { + return nil + } err := r.zw.writer.Close() if err != nil { return err } - r.zw.Unlock() + r.finished = true return r.zf.Close() } func (r *PlanReplayerRunner) Dump(ctx context.Context, conn *sql.Conn, query, queryName string) error { + r.Lock() + defer r.Unlock() rows, err := conn.QueryContext(ctx, query) if err != nil { return fmt.Errorf("execute query %s failed %v", query, err) @@ -111,8 +125,6 @@ func (r *PlanReplayerRunner) writeDataIntoZW(b []byte, queryName string) error { return err } key := base64.URLEncoding.EncodeToString(k) - r.zw.Lock() - defer r.zw.Unlock() wr, err := r.zw.writer.Create(fmt.Sprintf("%v_%v_%v.zip", queryName, time.Now().Format("2006-01-02-15:04:05"), key)) if err != nil { diff --git a/pkg/workload/workload.go b/pkg/workload/workload.go index d39dda1..de129d7 100644 --- a/pkg/workload/workload.go +++ b/pkg/workload/workload.go @@ -20,4 +20,5 @@ type Workloader interface { IsPlanReplayerDumpEnabled() bool PreparePlanReplayerDump() error FinishPlanReplayerDump() error + Exec(sql string) error } diff --git a/rawsql/workload.go b/rawsql/workload.go index c30ae50..efedd1a 100644 --- a/rawsql/workload.go +++ b/rawsql/workload.go @@ -207,3 +207,7 @@ func (w *Workloader) PreparePlanReplayerDump() error { func (w *Workloader) FinishPlanReplayerDump() error { return w.PlanReplayerRunner.Finish() } + +func (w *Workloader) Exec(sql string) error { + return nil +} diff --git a/tpcc/csv.go b/tpcc/csv.go index 4dc7125..5e4e144 100644 --- a/tpcc/csv.go +++ b/tpcc/csv.go @@ -499,3 +499,7 @@ func (c *CSVWorkLoader) PreparePlanReplayerDump() error { func (c *CSVWorkLoader) FinishPlanReplayerDump() error { return nil } + +func (c *CSVWorkLoader) Exec(sql string) error { + return nil +} diff --git a/tpcc/workload.go b/tpcc/workload.go index 9248fe3..030874d 100644 --- a/tpcc/workload.go +++ b/tpcc/workload.go @@ -499,3 +499,7 @@ func (w *Workloader) PreparePlanReplayerDump() error { func (w *Workloader) FinishPlanReplayerDump() error { return nil } + +func (w *Workloader) Exec(sql string) error { + return nil +} diff --git a/tpch/workload.go b/tpch/workload.go index 6c03266..cc70894 100644 --- a/tpch/workload.go +++ b/tpch/workload.go @@ -209,27 +209,6 @@ func (w *Workloader) Run(ctx context.Context, threadID int) error { queryName := w.cfg.QueryNames[s.queryIdx%len(w.cfg.QueryNames)] query := query(w.cfg.Driver, queryName) - - if queryName == "q15" { - _, err := w.db.Exec(`create view revenue0 (supplier_no, total_revenue) as - select - l_suppkey, - sum(l_extendedprice * (1 - l_discount)) - from - lineitem - where - l_shipdate >= '1997-07-01' - and l_shipdate < date_add('1997-07-01', interval '3' month) - group by - l_suppkey;`) - if err != nil { - return err - } - defer func() { - w.db.Exec("drop view revenue0;") - }() - } - // only for driver == mysql and EnablePlanReplayer == true if w.cfg.EnablePlanReplayer && w.cfg.Driver == "mysql" { w.dumpPlanReplayer(ctx, s, query, queryName) @@ -337,3 +316,11 @@ func (w *Workloader) PreparePlanReplayerDump() error { func (w *Workloader) FinishPlanReplayerDump() error { return w.PlanReplayerRunner.Finish() } + +func (w *Workloader) Exec(sql string) error { + _, err := w.db.Exec(sql) + if err != nil { + return err + } + return nil +}