forked from digitalocean/go-workers2
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathworker_test.go
201 lines (164 loc) · 4.59 KB
/
worker_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
package workers
import (
"log"
"os"
"sync"
"testing"
"github.com/stretchr/testify/assert"
)
type dummyFetcher struct {
queue func() string
fetch func()
acknowledge func(*Msg)
ready func() chan bool
messages func() chan *Msg
close func()
closed func() bool
}
func (d dummyFetcher) Queue() string { return d.queue() }
func (d dummyFetcher) Fetch() { d.fetch() }
func (d dummyFetcher) Acknowledge(m *Msg) { d.acknowledge(m) }
func (d dummyFetcher) Ready() chan bool { return d.ready() }
func (d dummyFetcher) Messages() chan *Msg { return d.messages() }
func (d dummyFetcher) Close() { d.close() }
func (d dummyFetcher) Closed() bool { return d.closed() }
func TestNewWorker(t *testing.T) {
testLogger := log.New(os.Stdout, "test-go-workers2: ", log.Ldate|log.Lmicroseconds)
cc := newCallCounter()
w := newWorker(testLogger, "q", 0, cc.F)
assert.Equal(t, "q", w.queue)
assert.Equal(t, 1, w.concurrency)
assert.NotNil(t, w.stop)
assert.NotNil(t, w.handler)
w.handler(nil)
assert.Equal(t, 1, cc.count)
w = newWorker(testLogger, "q", -5, cc.F)
assert.Equal(t, 1, w.concurrency)
w = newWorker(testLogger, "q", 10, cc.F)
assert.Equal(t, 10, w.concurrency)
}
func TestWorker(t *testing.T) {
testLogger := log.New(os.Stdout, "test-go-workers2: ", log.Ldate|log.Lmicroseconds)
readyCh := make(chan bool)
msgCh := make(chan *Msg)
ackCh := make(chan *Msg)
fetchCh := make(chan bool)
var dfClosedLock sync.Mutex
var dfClosed bool
df := dummyFetcher{
queue: func() string { return "q" },
fetch: func() { close(fetchCh) },
acknowledge: func(m *Msg) { ackCh <- m },
ready: func() chan bool { return readyCh },
messages: func() chan *Msg { return msgCh },
close: func() {
dfClosedLock.Lock()
defer dfClosedLock.Unlock()
dfClosed = true
},
closed: func() bool {
dfClosedLock.Lock()
defer dfClosedLock.Unlock()
return dfClosed
},
}
cc := newCallCounter()
w := newWorker(testLogger, "q", 2, cc.F)
var wg sync.WaitGroup
go func() {
wg.Add(1)
w.start(df)
wg.Done()
}()
// This block delays until the entire worker is started.
// In order for a message to be consumed, at least one task runner
// must be started. We consume the message off of ackCh for sanity.
// Acquiring and then releasing the runnersLock ensures that start
// has finished its setup work
<-fetchCh // We should be sure that Fetch got called before providing any messages
msgCh <- cc.msg()
<-ackCh
w.runnersLock.Lock()
w.runnersLock.Unlock()
assert.True(t, w.running)
assert.Len(t, w.runners, 2)
t.Run("cannot start while running", func(t *testing.T) {
w.start(df)
// This test would time out if w.start doesn't return immediately
})
t.Run(".inProgressMessages", func(t *testing.T) {
// None running
msgs := w.inProgressMessages()
assert.Empty(t, msgs)
// Enqueue one
msgCh <- cc.syncMsg()
<-cc.syncCh
msgs = w.inProgressMessages()
assert.Len(t, msgs, 1)
// Enqueue another
msgCh <- cc.syncMsg()
<-cc.syncCh
msgs = w.inProgressMessages()
assert.Len(t, msgs, 2)
// allow one to finish
cc.ackSyncCh <- true
<-ackCh
msgs = w.inProgressMessages()
assert.Len(t, msgs, 1)
// alow the other to finish
cc.ackSyncCh <- true
<-ackCh
msgs = w.inProgressMessages()
assert.Empty(t, msgs)
})
w.quit()
wg.Wait()
}
func TestWorkerProcessesAndAcksMessages(t *testing.T) {
testLogger := log.New(os.Stdout, "test-go-workers2: ", log.Ldate|log.Lmicroseconds)
readyCh := make(chan bool)
msgCh := make(chan *Msg)
ackCh := make(chan *Msg)
closeCh := make(chan bool)
df := dummyFetcher{
queue: func() string { return "q" },
fetch: func() { <-closeCh },
acknowledge: func(m *Msg) { ackCh <- m },
ready: func() chan bool { return readyCh },
messages: func() chan *Msg { return msgCh },
close: func() { close(closeCh) },
closed: func() bool {
select {
case <-closeCh:
return true
default:
return false
}
},
}
cc := newCallCounter()
w := newWorker(testLogger, "q", 1, cc.F)
var wg sync.WaitGroup
go func() {
wg.Add(1)
w.start(df)
wg.Done()
}()
// since we have concurrency 1, messages _must_ be processed in order
msgCh <- cc.msg()
ackedMsg := <-ackCh
assert.True(t, ackedMsg.ack)
assert.NotZero(t, ackedMsg.startedAt)
assert.Equal(t, 1, cc.count)
noAck := cc.noAckMsg()
msgCh <- noAck
msgCh <- cc.msg()
ackedMsg = <-ackCh
assert.False(t, noAck.ack)
assert.NotZero(t, noAck.startedAt)
assert.True(t, ackedMsg.ack)
assert.NotZero(t, ackedMsg.startedAt)
assert.Equal(t, 3, cc.count)
w.quit()
wg.Wait()
}