1
1
package main
2
2
3
3
import (
4
+ "context"
4
5
"database/sql"
5
6
"encoding/json"
6
7
"fmt"
7
8
"io/ioutil"
8
9
"math/rand"
10
+ "os"
9
11
"strconv"
12
+ "strings"
10
13
"sync"
14
+ "sync/atomic"
11
15
"time"
12
16
17
+ "github.com/namsral/flag"
18
+
13
19
_ "github.com/go-sql-driver/mysql"
14
20
)
15
21
22
+ type Substitution struct {
23
+ Key string `json:"key"`
24
+ Min int `json:"min"`
25
+ Max int `json:"max"`
26
+ }
27
+
16
28
type Config struct {
17
- RequestsPerSecond int `json:"requestsPerSecond"`
18
- Queries []string `json:"queries"`
19
- ConnectionString string `json:"connectionString"`
20
- PrintLogs bool `json:"printLogs"`
21
- TimeToRunInSeconds int `json:"timeToRunInSeconds"`
29
+ RequestsPerSecond int `json:"requestsPerSecond"`
30
+ Queries []string `json:"queries"`
31
+ ConnectionString string `json:"connectionString"`
32
+ PrintLogs bool `json:"printLogs"`
33
+ TimeToRunInSeconds int `json:"timeToRunInSeconds"`
34
+ PoolConnections int `json:"poolConnections"`
35
+ DryRun bool `json:"dryRun"`
36
+ QueryTimeout int `json:"queryTimeout"`
37
+ ConnectionLifetime time.Duration `json:"connectionLifeTime"`
38
+ Substitution []Substitution `json:"substitution"`
22
39
}
23
40
24
- var config Config
41
+ var config = Config {
42
+ RequestsPerSecond : 100 ,
43
+ Queries : make ([]string , 0 ),
44
+ ConnectionString : "" ,
45
+ PrintLogs : true ,
46
+ TimeToRunInSeconds : 10 ,
47
+ PoolConnections : 10 ,
48
+ DryRun : true ,
49
+ QueryTimeout : 10 ,
50
+ ConnectionLifetime : 0 ,
51
+ Substitution : []Substitution {},
52
+ }
53
+ var configFilePath string
25
54
var db * sql.DB
26
55
var queryTimesInMS []int
56
+ var ops int64 = 0
57
+ var latency int64 = 0
27
58
28
59
var wg sync.WaitGroup
29
60
30
61
func main () {
31
- loadConfig ()
62
+ flag .StringVar (& configFilePath , "configFile" , "/app/config.json" , "path to config file" )
63
+ flag .Parse ()
64
+ go printQps ()
65
+ loadConfig (configFilePath )
32
66
openMemSQLConnection ()
33
67
dispatchQueries ()
34
- displayAverageQueryTime ()
68
+ }
69
+
70
+ func printQps () {
71
+ ticker := time .NewTicker (5 * time .Second )
72
+ for range ticker .C {
73
+ loadedOps := atomic .LoadInt64 (& ops )
74
+ atomic .StoreInt64 (& ops , 0 )
75
+ loadedLatency := atomic .LoadInt64 (& latency )
76
+ atomic .StoreInt64 (& latency , 0 )
77
+ fmt .Println ("qps: " , loadedOps / 5 , ", average latency: " , time .Duration (loadedLatency / loadedOps ))
78
+
79
+ }
80
+
81
+ }
82
+
83
+ func waitTimeout (wg * sync.WaitGroup , timeout time.Duration ) bool {
84
+ c := make (chan struct {})
85
+ go func () {
86
+ defer close (c )
87
+ wg .Wait ()
88
+ }()
89
+ select {
90
+ case <- c :
91
+ return false // completed normally
92
+ case <- time .After (timeout ):
93
+ return true // timed out
94
+ }
35
95
}
36
96
37
97
func openMemSQLConnection () {
38
98
var err error
39
99
db , err = sql .Open ("mysql" , config .ConnectionString )
100
+ db .SetConnMaxLifetime (config .ConnectionLifetime )
101
+ db .SetMaxIdleConns (config .PoolConnections )
102
+ db .SetMaxOpenConns (config .PoolConnections )
40
103
41
104
if err != nil {
42
105
panic (err .Error ())
@@ -48,45 +111,30 @@ func openMemSQLConnection() {
48
111
panic (err .Error ())
49
112
}
50
113
51
- fmt .Println ("Connection succeeded" )
52
- }
53
-
54
- func trackFuncTime (start time.Time ) {
55
- elapsed := time .Since (start )
56
- queryTimesInMS = append (queryTimesInMS , int (elapsed / time .Millisecond ))
57
-
58
- if config .PrintLogs {
59
- fmt .Println (elapsed )
60
- }
114
+ fmt .Println ("Connection succeeded with pool size" , config .PoolConnections )
61
115
}
62
116
63
117
func executeQuery (query string ) {
64
118
defer wg .Done ()
65
- defer trackFuncTime (time .Now ())
119
+ ctx , cancel := context .WithTimeout (context .Background (), time .Duration (config .QueryTimeout )* time .Second )
120
+ defer cancel ()
66
121
67
- _ , err := db .Exec (query )
68
-
69
- if err != nil {
70
- fmt .Println (err .Error ())
122
+ for _ , subs := range config .Substitution {
123
+ query = strings .Replace (query , subs .Key , strconv .Itoa (rand .Intn (subs .Max - subs .Min )+ subs .Min ), 1 )
71
124
}
72
125
73
126
if config .PrintLogs {
74
- fmt .Println ("Executed " + query )
127
+ fmt .Println ("Executed " , query )
75
128
}
76
- }
77
-
78
- func displayAverageQueryTime () {
79
- if config . PrintLogs {
80
- var totalvalues int
81
-
82
- for _ , val := range queryTimesInMS {
83
- totalvalues += val
129
+ if config . DryRun == false {
130
+ start := time . Now ()
131
+ _ , err := db . ExecContext ( ctx , query )
132
+ if err == nil {
133
+ atomic . AddInt64 ( & ops , 1 )
134
+ atomic . AddInt64 ( & latency , int64 ( time . Since ( start )))
135
+ } else {
136
+ fmt . Fprintf ( os . Stderr , "query err: %s \n " , err . Error ())
84
137
}
85
-
86
- var count = len (queryTimesInMS )
87
- sum := totalvalues / count
88
-
89
- fmt .Println ("Average time: " + strconv .Itoa (sum ) + "ms" )
90
138
}
91
139
}
92
140
@@ -97,23 +145,44 @@ func dispatchQueries() {
97
145
shouldEnd := start .Add (time .Second * time .Duration (config .TimeToRunInSeconds ))
98
146
99
147
for time .Now ().Before (shouldEnd ) {
100
- for i := 0 ; i < config .RequestsPerSecond ; i ++ {
101
- wg .Add (1 )
102
- go executeQuery (config .Queries [rand .Intn (len (config .Queries ))])
148
+ l := len (config .Queries )
149
+ for i := 0 ; i < config .RequestsPerSecond / l ; i ++ {
150
+ wg .Add (l )
151
+ for q := 0 ; q < l ; q ++ {
152
+ go executeQuery (config .Queries [q ])
153
+ }
103
154
}
104
155
105
156
time .Sleep (1 * time .Second )
106
157
}
107
-
108
- wg .Wait ()
158
+ fmt .Println ("wait" )
159
+ if waitTimeout (& wg , 10 * time .Second ) {
160
+ fmt .Println ("Timed out waiting for wait group" )
161
+ } else {
162
+ fmt .Println ("Wait group finished" )
163
+ }
164
+ fmt .Println ("end wait" )
165
+ defer db .Close ()
109
166
}
110
167
111
- func loadConfig () {
112
- data , err := ioutil .ReadFile ("config.json" )
168
+ func loadConfig (configFilePath string ) {
169
+ data , err := ioutil .ReadFile (configFilePath )
170
+ if err != nil {
171
+ panic (err .Error () + " use -configFile flag" )
172
+ }
173
+
174
+ err = json .Unmarshal (data , & config )
113
175
if err != nil {
114
176
panic (err )
115
177
}
178
+ fmt .Println ("LOADED CONFIG" )
179
+ pretty , err := json .MarshalIndent (config , "" , " " )
180
+ if err != nil {
181
+ panic (err )
182
+ }
183
+ fmt .Printf ("%s\n " , string (pretty ))
116
184
117
- json .Unmarshal (data , & config )
118
- fmt .Println ("Config loaded" )
185
+ if len (config .Queries ) < 1 {
186
+ panic ("no query specified" )
187
+ }
119
188
}
0 commit comments