-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrun.go
93 lines (73 loc) · 1.73 KB
/
run.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package main
import (
"sync/atomic"
"time"
"github.com/bitgn/layers/go/benchmark/bench"
)
var (
pendingRequests int64
waitingRequests int64
)
func runBenchmark(ms chan metrics, hz int, l bench.Launcher) {
if hz > 0 {
runFixedThroughput(ms, hz, l)
} else {
runAdaptiveThroughput(ms, -hz, l)
}
}
func runAdaptiveThroughput(ms chan metrics, concurrency int, l bench.Launcher) {
for i := 0; i < concurrency; i++ {
xor := NewXorShift()
go func() {
for {
begin := time.Now()
x := xor.Next()
atomic.AddInt64(&pendingRequests, 1)
err := l.Exec(x)
total := time.Since(begin)
result := metrics{
error: err != nil,
nanoseconds: total.Nanoseconds(),
}
ms <- result
atomic.AddInt64(&pendingRequests, -1)
}
}()
}
}
func runFixedThroughput(ms chan metrics, hz int, l bench.Launcher) {
var sent int
xor := NewXorShift()
started := time.Now()
period := time.Duration(float64(time.Second) / float64(hz))
for range time.Tick(period) {
begin := time.Now()
x := xor.Next()
// ticker might be slow or lagging,
// so we want to track how many requests we should've sent by now
elapsed := begin.Sub(started)
planned := int(elapsed.Seconds() * float64(hz))
missing := planned - sent
waitingRequests = int64(missing)
// don't trust the ticker to catch up
// just sent all missing requests
for i := 0; i < missing; i++ {
atomic.AddInt64(&pendingRequests, 1)
go func() {
err := l.Exec(x)
total := time.Since(begin)
result := metrics{
error: err != nil,
nanoseconds: total.Nanoseconds(),
}
ms <- result
atomic.AddInt64(&pendingRequests, -1)
}()
sent++
}
}
}
type metrics struct {
nanoseconds int64
error bool
}