Skip to content

Commit

Permalink
support risingwave and refactor ddl parameters
Browse files Browse the repository at this point in the history
  • Loading branch information
cyliu0 committed Mar 18, 2024
1 parent 5d515d5 commit d294ba0
Show file tree
Hide file tree
Showing 9 changed files with 44 additions and 20 deletions.
4 changes: 4 additions & 0 deletions ch/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ch
import (
"context"
"fmt"
"github.com/pingcap/go-tpc/pkg/util"
)

var allTables []string
Expand All @@ -15,6 +16,9 @@ func init() {
func (w *Workloader) createTableDDL(ctx context.Context, query string, tableName string, action string) error {
s := w.getState(ctx)
fmt.Printf("%s %s\n", action, tableName)
if ctx.Value("risingwave") != nil && ctx.Value("risingwave").(bool) {
query = util.ConvertToRisingWaveDDL(query)
}
if _, err := s.Conn.ExecContext(ctx, query); err != nil {
return err
}
Expand Down
12 changes: 4 additions & 8 deletions cmd/go-tpc/ch_benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,6 @@ func registerCHBenchmark(root *cobra.Command) {
"tidb_index_serial_scan_concurrency",
1,
"tidb_index_serial_scan_concurrency param for analyze jobs")
cmdPrepare.PersistentFlags().BoolVar(&chConfig.OnlyDdl,
"only-ddl",
false,
"ch prepare only ddl (default false)")
cmdPrepare.PersistentFlags().BoolVar(&chConfig.SkipDdl,
"skip-ddl",
false,
"ch prepare skip ddl (default false)")

var cmdRun = &cobra.Command{
Use: "run",
Expand Down Expand Up @@ -134,9 +126,13 @@ func executeCH(action string, openAP func() (*sql.DB, error)) {
tpccConfig.DBName = dbName
tpccConfig.Threads = threads
tpccConfig.Isolation = isolationLevel
tpccConfig.SkipDdl = skipDdl
tpccConfig.OnlyDdl = onlyDdl
chConfig.OutputStyle = outputStyle
chConfig.Driver = driver
chConfig.DBName = dbName
chConfig.OnlyDdl = onlyDdl
chConfig.SkipDdl = skipDdl
chConfig.QueryNames = strings.Split(chConfig.RawQueries, ",")
if action == "run" {
chConfig.PlanReplayerConfig.Host = apHosts[0]
Expand Down
6 changes: 6 additions & 0 deletions cmd/go-tpc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ var (
connParams string
outputStyle string
targets []string
skipDdl bool
onlyDdl bool
risingwave bool

globalDB *sql.DB
globalCtx context.Context
Expand Down Expand Up @@ -209,6 +212,9 @@ func main() {
rootCmd.PersistentFlags().StringVar(&outputStyle, "output", util.OutputStylePlain, "output style, valid values can be { plain | table | json }")
rootCmd.PersistentFlags().StringSliceVar(&targets, "targets", nil, "Target database addresses")
rootCmd.PersistentFlags().MarkHidden("targets")
rootCmd.PersistentFlags().BoolVar(&skipDdl, "skip-ddl", false, "Skip DDL operations")
rootCmd.PersistentFlags().BoolVar(&onlyDdl, "only-ddl", false, "Only DDL operations")
rootCmd.PersistentFlags().BoolVar(&risingwave, "risingwave", false, "Convert DDL to support RisingWave")

cobra.EnablePrefixMatching = true

Expand Down
1 change: 1 addition & 0 deletions cmd/go-tpc/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func execute(timeoutCtx context.Context, w workload.Workloader, action string, t
count := totalCount / threads

ctx := w.InitThread(context.Background(), index)
ctx = context.WithValue(ctx, "risingwave", risingwave)
defer w.CleanupThread(ctx, index)

switch action {
Expand Down
4 changes: 2 additions & 2 deletions cmd/go-tpc/tpcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ func executeTpcc(action string) {
tpccConfig.DBName = dbName
tpccConfig.Threads = threads
tpccConfig.Isolation = isolationLevel
tpccConfig.OnlyDdl = onlyDdl
tpccConfig.SkipDdl = skipDdl
var (
w workload.Workloader
err error
Expand Down Expand Up @@ -102,8 +104,6 @@ func registerTpcc(root *cobra.Command) {
"generating file, separated by ','. Valid only if output is set. If this flag is not set, generate all tables by default")
cmdPrepare.PersistentFlags().IntVar(&tpccConfig.PrepareRetryCount, "retry-count", 50, "Retry count when errors occur")
cmdPrepare.PersistentFlags().DurationVar(&tpccConfig.PrepareRetryInterval, "retry-interval", 10*time.Second, "The interval for each retry")
cmdPrepare.PersistentFlags().BoolVar(&tpccConfig.OnlyDdl, "only-ddl", false, "TPCC prepare ddl only (default false)")
cmdPrepare.PersistentFlags().BoolVar(&tpccConfig.SkipDdl, "skip-ddl", false, "TPCC prepare skip ddl (default false)")

var cmdRun = &cobra.Command{
Use: "run",
Expand Down
12 changes: 2 additions & 10 deletions cmd/go-tpc/tpch.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ func executeTpch(action string) {
tpchConfig.Driver = driver
tpchConfig.DBName = dbName
tpchConfig.PrepareThreads = threads
tpchConfig.OnlyDdl = onlyDdl
tpchConfig.SkipDdl = skipDdl
tpchConfig.QueryNames = strings.Split(tpchConfig.RawQueries, ",")
if action == "prepare" && tpchConfig.OutputType == "kafka" {
if dropData {
Expand Down Expand Up @@ -132,16 +134,6 @@ func registerTpch(root *cobra.Command) {
20,
"kafka flush timeout seconds",
)
cmdPrepare.PersistentFlags().BoolVar(&tpchConfig.SkipDdl,
"skip-ddl",
false,
"tpch prepare skip ddl (default false)",
)
cmdPrepare.PersistentFlags().BoolVar(&tpchConfig.OnlyDdl,
"only-ddl",
false,
"tpch prepare only ddl (default false)",
)

var cmdRun = &cobra.Command{
Use: "run",
Expand Down
17 changes: 17 additions & 0 deletions pkg/util/risingwave.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package util

import (
"fmt"
"regexp"
)

func ConvertToRisingWaveDDL(query string) string {
fmt.Println(query)
query = regexp.MustCompile("(?i)varchar\\(\\d+\\)").ReplaceAllString(query, "VARCHAR")
query = regexp.MustCompile("(?i)numeric\\(.*?\\)").ReplaceAllString(query, "NUMERIC")
query = regexp.MustCompile("(?i)decimal\\(.*?\\)").ReplaceAllString(query, "DECIMAL")
query = regexp.MustCompile("(?i)char\\(\\d+\\)").ReplaceAllString(query, "VARCHAR")
query = regexp.MustCompile("(?i) not null").ReplaceAllString(query, "")
fmt.Println(query)
return query
}
4 changes: 4 additions & 0 deletions tpcc/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tpcc
import (
"context"
"fmt"
"github.com/pingcap/go-tpc/pkg/util"
)

const (
Expand Down Expand Up @@ -31,6 +32,9 @@ func newDDLManager(parts int, useFK bool, warehouses, partitionType int) *ddlMan
func (w *ddlManager) createTableDDL(ctx context.Context, query string, tableName string) error {
s := getTPCCState(ctx)
fmt.Printf("creating table %s\n", tableName)
if ctx.Value("risingwave") != nil && ctx.Value("risingwave").(bool) {
query = util.ConvertToRisingWaveDDL(query)
}
if _, err := s.Conn.ExecContext(ctx, query); err != nil {
return err
}
Expand Down
4 changes: 4 additions & 0 deletions tpch/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tpch
import (
"context"
"fmt"
"github.com/pingcap/go-tpc/pkg/util"
)

var AllTables []string
Expand All @@ -14,6 +15,9 @@ func init() {
func (w *Workloader) createTableDDL(ctx context.Context, query string, tableName string, action string) error {
s := w.getState(ctx)
fmt.Printf("%s %s\n", action, tableName)
if ctx.Value("risingwave") != nil && ctx.Value("risingwave").(bool) {
query = util.ConvertToRisingWaveDDL(query)
}
if _, err := s.Conn.ExecContext(ctx, query); err != nil {
return err
}
Expand Down

0 comments on commit d294ba0

Please sign in to comment.