Skip to content

Commit

Permalink
Merge pull request #4 from cyliu0/kafka
Browse files Browse the repository at this point in the history
*: kafka support for tpch
  • Loading branch information
huangjw806 authored May 16, 2023
2 parents 28afe45 + e47e068 commit 75ba1e9
Show file tree
Hide file tree
Showing 8 changed files with 393 additions and 12 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
GOARCH := $(if $(GOARCH),$(GOARCH),amd64)
GO=GO15VENDOREXPERIMENT="1" CGO_ENABLED=0 GOARCH=$(GOARCH) GO111MODULE=on go
#GO=GO15VENDOREXPERIMENT="1" CGO_ENABLED=0 GOARCH=$(GOARCH) GO111MODULE=on go
GO := go

PACKAGE_LIST := go list ./...| grep -vE "cmd"
PACKAGES := $$($(PACKAGE_LIST))
Expand Down
18 changes: 12 additions & 6 deletions cmd/go-tpc/tpch.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ import (
var tpchConfig tpch.Config

func executeTpch(action string) {
openDB()
defer closeDB()

if globalDB == nil {
util.StdErrLogger.Printf("cannot connect to the database")
os.Exit(1)
if !(tpchConfig.OutputType == "csv" || tpchConfig.OutputType == "kafka") {
openDB()
defer closeDB()
if globalDB == nil {
util.StdErrLogger.Printf("cannot connect to the database")
os.Exit(1)
}
}

tpchConfig.PlanReplayerConfig.Host = hosts[0]
Expand Down Expand Up @@ -102,6 +103,11 @@ func registerTpch(root *cobra.Command) {
"output-dir",
"",
"Output directory for generating file if specified")
cmdPrepare.PersistentFlags().StringVar(&tpchConfig.KafkaAddr,
"kafka-addr",
"",
"Kafka address",
)

var cmdRun = &cobra.Command{
Use: "run",
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.18

require (
github.com/HdrHistogram/hdrhistogram-go v1.0.0
github.com/confluentinc/confluent-kafka-go v1.9.2
github.com/go-sql-driver/mysql v1.5.0
github.com/jedib0t/go-pretty v4.3.0+incompatible
github.com/lib/pq v1.10.6
Expand Down
116 changes: 114 additions & 2 deletions go.sum

Large diffs are not rendered by default.

93 changes: 93 additions & 0 deletions pkg/sink/kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package sink

import (
"context"
"database/sql"
"encoding/json"
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
"reflect"
)

type KafkaSink struct {
topic string
partition int32
producer *kafka.Producer
keys []string
}

var _ Sink = &KafkaSink{}

func NewKafkaSink(topic string, producer *kafka.Producer, partition int, keys []string) *KafkaSink {
return &KafkaSink{
partition: int32(partition),
topic: topic,
keys: keys,
producer: producer,
}
}

func buildJsonRow(keys []string, values []interface{}) ([]byte, error) {
msg := make(map[string]interface{})
for i, key := range keys {
ty := reflect.TypeOf(values[i])
if ty == nil {
msg[key] = nil
continue
}
switch ty.Kind() {
case reflect.String, reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64,
reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64, reflect.Float32, reflect.Float64:
msg[key] = values[i]
continue
}
switch v := values[i].(type) {
case sql.NullString:
if v.Valid {
msg[key] = v.String
} else {
msg[key] = nil
}
case sql.NullInt64:
if v.Valid {
msg[key] = v.Int64
} else {
msg[key] = nil
}
case sql.NullFloat64:
if v.Valid {
msg[key] = v.Float64
} else {
msg[key] = nil
}
default:
panic(fmt.Sprintf("unssuported type: %T", v))
}
}
return json.Marshal(msg)
}

func (k *KafkaSink) produce(msg []byte) error {
return k.producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &k.topic, Partition: k.partition},
Value: msg,
}, nil)
}

func (k *KafkaSink) WriteRow(ctx context.Context, values ...interface{}) error {
row, err := buildJsonRow(k.keys, values)
if err != nil {
return err
}
return k.produce(row)
}

func (k *KafkaSink) Flush(ctx context.Context) error {
k.producer.Flush(10 * 1000)
return nil
}

func (k *KafkaSink) Close(ctx context.Context) error {
k.producer.Flush(10 * 1000)
return nil
}
51 changes: 51 additions & 0 deletions pkg/util/kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package util

import (
"context"
"github.com/confluentinc/confluent-kafka-go/kafka"
"time"
)

func CreateTopics(ctx context.Context, kafkaAddr string, tables []string, partitions int) error {
client, err := kafka.NewAdminClient(&kafka.ConfigMap{
"bootstrap.servers": kafkaAddr,
})
if err != nil {
return err
}
defer client.Close()
topics := make([]kafka.TopicSpecification, 0)
for _, table := range tables {
topics = append(topics, kafka.TopicSpecification{
Topic: table,
NumPartitions: partitions,
ReplicationFactor: 1,
})
}
results, err := client.CreateTopics(ctx, topics, kafka.SetAdminOperationTimeout(10*time.Second))
if err != nil {
return err
}
for _, result := range results {
StdErrLogger.Printf("create topic result: %s", result.String())
}
return nil
}

func DeleteTopics(ctx context.Context, kafkaAddr string, tables []string) error {
client, err := kafka.NewAdminClient(&kafka.ConfigMap{
"bootstrap.servers": kafkaAddr,
})
if err != nil {
return err
}
defer client.Close()
results, err := client.DeleteTopics(ctx, tables, kafka.SetAdminOperationTimeout(10*time.Second))
if err != nil {
return err
}
for _, result := range results {
StdErrLogger.Printf("delete topic result: %s", result.String())
}
return nil
}
80 changes: 80 additions & 0 deletions tpch/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tpch
import (
"context"
"database/sql"
"github.com/confluentinc/confluent-kafka-go/kafka"

"github.com/pingcap/go-tpc/pkg/sink"
"github.com/pingcap/go-tpc/tpch/dbgen"
Expand Down Expand Up @@ -179,52 +180,131 @@ func NewOrderLoader(ctx context.Context, db *sql.DB, concurrency int) *orderLoad
`INSERT INTO orders (O_ORDERKEY, O_CUSTKEY, O_ORDERSTATUS, O_TOTALPRICE, O_ORDERDATE, O_ORDERPRIORITY, O_CLERK, O_SHIPPRIORITY, O_COMMENT) VALUES `, 0, 0)
}, concurrency), ctx}}
}

func NewKafkaOrderLoader(ctx context.Context, producer *kafka.Producer, concurrency int) *orderLoader {
cols := []string{"O_ORDERKEY", "O_CUSTKEY", "O_ORDERSTATUS", "O_TOTALPRICE", "O_ORDERDATE", "O_ORDERPRIORITY", "O_CLERK", "O_SHIPPRIORITY", "O_COMMENT"}
return &orderLoader{sqlLoader{
sink.NewConcurrentSink(func(idx int) sink.Sink {
return sink.NewKafkaSink("orders", producer, idx, cols)
}, concurrency), ctx,
}}
}

func NewLineItemLoader(ctx context.Context, db *sql.DB, concurrency int) *lineItemloader {
return &lineItemloader{sqlLoader{
sink.NewConcurrentSink(func(idx int) sink.Sink {
return sink.NewSQLSink(db,
`INSERT INTO lineitem (L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER, L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT, L_TAX, L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE, L_COMMITDATE, L_RECEIPTDATE, L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT) VALUES `, 0, 0)
}, concurrency), ctx}}
}

func NewKafkaLineItemLoader(ctx context.Context, producer *kafka.Producer, concurrency int) *lineItemloader {
cols := []string{"L_ORDERKEY", "L_PARTKEY", "L_SUPPKEY", "L_LINENUMBER", "L_QUANTITY", "L_EXTENDEDPRICE", "L_DISCOUNT", "L_TAX", "L_RETURNFLAG", "L_LINESTATUS", "L_SHIPDATE", "L_COMMITDATE", "L_RECEIPTDATE", "L_SHIPINSTRUCT", "L_SHIPMODE", "L_COMMENT"}
return &lineItemloader{sqlLoader{
sink.NewConcurrentSink(func(idx int) sink.Sink {
return sink.NewKafkaSink("lineitem", producer, idx, cols)
}, concurrency), ctx,
}}
}

func NewCustLoader(ctx context.Context, db *sql.DB, concurrency int) *custLoader {
return &custLoader{sqlLoader{
sink.NewConcurrentSink(func(idx int) sink.Sink {
return sink.NewSQLSink(db,
`INSERT INTO customer (C_CUSTKEY, C_NAME, C_ADDRESS, C_NATIONKEY, C_PHONE, C_ACCTBAL, C_MKTSEGMENT, C_COMMENT) VALUES `, 0, 0)
}, concurrency), ctx}}
}

func NewKafkaCustLoader(ctx context.Context, producer *kafka.Producer, concurrency int) *custLoader {
cols := []string{"C_CUSTKEY", "C_NAME", "C_ADDRESS", "C_NATIONKEY", "C_PHONE", "C_ACCTBAL", "C_MKTSEGMENT", "C_COMMENT"}
return &custLoader{sqlLoader{
sink.NewConcurrentSink(func(idx int) sink.Sink {
return sink.NewKafkaSink("customer", producer, idx, cols)
}, concurrency), ctx,
}}
}

func NewPartLoader(ctx context.Context, db *sql.DB, concurrency int) *partLoader {
return &partLoader{sqlLoader{
sink.NewConcurrentSink(func(idx int) sink.Sink {
return sink.NewSQLSink(db,
`INSERT INTO part (P_PARTKEY, P_NAME, P_MFGR, P_BRAND, P_TYPE, P_SIZE, P_CONTAINER, P_RETAILPRICE, P_COMMENT) VALUES `, 0, 0)
}, concurrency), ctx}}
}

func NewKafkaPartLoader(ctx context.Context, producer *kafka.Producer, concurrency int) *partLoader {
cols := []string{"P_PARTKEY", "P_NAME", "P_MFGR", "P_BRAND", "P_TYPE", "P_SIZE", "P_CONTAINER", "P_RETAILPRICE", "P_COMMENT"}
return &partLoader{sqlLoader{
sink.NewConcurrentSink(func(idx int) sink.Sink {
return sink.NewKafkaSink("part", producer, idx, cols)
}, concurrency), ctx,
}}
}

func NewPartSuppLoader(ctx context.Context, db *sql.DB, concurrency int) *partSuppLoader {
return &partSuppLoader{sqlLoader{
sink.NewConcurrentSink(func(idx int) sink.Sink {
return sink.NewSQLSink(db,
`INSERT INTO partsupp (PS_PARTKEY, PS_SUPPKEY, PS_AVAILQTY, PS_SUPPLYCOST, PS_COMMENT) VALUES `, 0, 0)
}, concurrency), ctx}}
}

func NewKafkaPartSuppLoader(ctx context.Context, producer *kafka.Producer, concurrency int) *partSuppLoader {
cols := []string{"PS_PARTKEY", "PS_SUPPKEY", "PS_AVAILQTY", "PS_SUPPLYCOST", "PS_COMMENT"}
return &partSuppLoader{sqlLoader{
sink.NewConcurrentSink(func(idx int) sink.Sink {
return sink.NewKafkaSink("partsupp", producer, idx, cols)
}, concurrency), ctx,
}}
}

func NewSuppLoader(ctx context.Context, db *sql.DB, concurrency int) *suppLoader {
return &suppLoader{sqlLoader{
sink.NewConcurrentSink(func(idx int) sink.Sink {
return sink.NewSQLSink(db,
`INSERT INTO supplier (S_SUPPKEY, S_NAME, S_ADDRESS, S_NATIONKEY, S_PHONE, S_ACCTBAL, S_COMMENT) VALUES `, 0, 0)
}, concurrency), ctx}}
}

func NewKafkaSuppLoader(ctx context.Context, producer *kafka.Producer, concurrency int) *suppLoader {
cols := []string{"S_SUPPKEY", "S_NAME", "S_ADDRESS", "S_NATIONKEY", "S_PHONE", "S_ACCTBAL", "S_COMMENT"}
return &suppLoader{sqlLoader{
sink.NewConcurrentSink(func(idx int) sink.Sink {
return sink.NewKafkaSink("supplier", producer, idx, cols)
}, concurrency), ctx,
}}
}

func NewNationLoader(ctx context.Context, db *sql.DB, concurrency int) *nationLoader {
return &nationLoader{sqlLoader{
sink.NewConcurrentSink(func(idx int) sink.Sink {
return sink.NewSQLSink(db,
`INSERT INTO nation (N_NATIONKEY, N_NAME, N_REGIONKEY, N_COMMENT) VALUES `, 0, 0)
}, concurrency), ctx}}
}

func NewKafkaNationLoader(ctx context.Context, producer *kafka.Producer, concurrency int) *nationLoader {
cols := []string{"N_NATIONKEY", "N_NAME", "N_REGIONKEY", "N_COMMENT"}
return &nationLoader{sqlLoader{
sink.NewConcurrentSink(func(idx int) sink.Sink {
return sink.NewKafkaSink("nation", producer, idx, cols)
}, concurrency), ctx,
}}
}

func NewRegionLoader(ctx context.Context, db *sql.DB, concurrency int) *regionLoader {
return &regionLoader{sqlLoader{
sink.NewConcurrentSink(func(idx int) sink.Sink {
return sink.NewSQLSink(db,
`INSERT INTO region (R_REGIONKEY, R_NAME, R_COMMENT) VALUES `, 0, 0)
}, concurrency), ctx}}
}

func NewKafkaRegionLoader(ctx context.Context, producer *kafka.Producer, concurrency int) *regionLoader {
cols := []string{"R_REGIONKEY", "R_NAME", "R_COMMENT"}
return &regionLoader{sqlLoader{
sink.NewConcurrentSink(func(idx int) sink.Sink {
return sink.NewKafkaSink("region", producer, idx, cols)
}, concurrency), ctx,
}}
}
Loading

0 comments on commit 75ba1e9

Please sign in to comment.