Skip to content

Commit c3722ef

Browse files
committed
fix queue.Poll() memory leak
- `sema` was not getting removed from `waiters` list after `queue.Poll()` timeout. So, continuously calling `queue.Poll()` with a timeout would eventually OOM the program if nothing was ever added to the queue. - also added more `waiters` test coverage
1 parent c99f2e5 commit c3722ef

File tree

2 files changed

+93
-0
lines changed

2 files changed

+93
-0
lines changed

queue/queue.go

+19
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,21 @@ func (w *waiters) put(sema *sema) {
7676
*w = append(*w, sema)
7777
}
7878

79+
func (w *waiters) remove(sema *sema) {
80+
if len(*w) == 0 {
81+
return
82+
}
83+
// build new slice, copy all except sema
84+
ws := *w
85+
newWs := make(waiters, 0, len(*w))
86+
for i := range ws {
87+
if ws[i] != sema {
88+
newWs = append(newWs, ws[i])
89+
}
90+
}
91+
*w = newWs
92+
}
93+
7994
type items []interface{}
8095

8196
func (items *items) get(number int64) []interface{} {
@@ -237,6 +252,10 @@ func (q *Queue) Poll(number int64, timeout time.Duration) ([]interface{}, error)
237252
select {
238253
case sema.ready <- true:
239254
// we called this before Put() could
255+
// Remove sema from waiters.
256+
q.lock.Lock()
257+
q.waiters.remove(sema)
258+
q.lock.Unlock()
240259
default:
241260
// Put() got it already, we need to call Done() so Put() can move on
242261
sema.response.Done()

queue/queue_test.go

+74
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,18 @@ func TestPoll(t *testing.T) {
125125
assert.Equal(t, ErrTimeout, err)
126126
}
127127

128+
func TestPollNoMemoryLeak(t *testing.T) {
129+
q := New(0)
130+
131+
assert.Len(t, q.waiters, 0)
132+
133+
for i := 0; i < 10; i++ {
134+
// Poll() should cleanup waiters after timeout
135+
q.Poll(1, time.Nanosecond)
136+
assert.Len(t, q.waiters, 0)
137+
}
138+
}
139+
128140
func TestAddEmptyPut(t *testing.T) {
129141
q := New(10)
130142

@@ -419,6 +431,68 @@ func TestTakeUntilOnDisposedQueue(t *testing.T) {
419431
assert.IsType(t, ErrDisposed, err)
420432
}
421433

434+
func TestWaiters(t *testing.T) {
435+
s1, s2, s3, s4 := newSema(), newSema(), newSema(), newSema()
436+
437+
w := waiters{}
438+
assert.Len(t, w, 0)
439+
440+
//
441+
// test put()
442+
w.put(s1)
443+
assert.Equal(t, waiters{s1}, w)
444+
445+
w.put(s2)
446+
w.put(s3)
447+
w.put(s4)
448+
assert.Equal(t, waiters{s1, s2, s3, s4}, w)
449+
450+
//
451+
// test remove()
452+
//
453+
// remove from middle
454+
w.remove(s2)
455+
assert.Equal(t, waiters{s1, s3, s4}, w)
456+
457+
// remove non-existing element
458+
w.remove(s2)
459+
assert.Equal(t, waiters{s1, s3, s4}, w)
460+
461+
// remove from beginning
462+
w.remove(s1)
463+
assert.Equal(t, waiters{s3, s4}, w)
464+
465+
// remove from end
466+
w.remove(s4)
467+
assert.Equal(t, waiters{s3}, w)
468+
469+
// remove last element
470+
w.remove(s3)
471+
assert.Empty(t, w)
472+
473+
// remove non-existing element
474+
w.remove(s3)
475+
assert.Empty(t, w)
476+
477+
//
478+
// test get()
479+
//
480+
// start with 3 elements in list
481+
w.put(s1)
482+
w.put(s2)
483+
w.put(s3)
484+
assert.Equal(t, waiters{s1, s2, s3}, w)
485+
486+
// get() returns each item in insertion order
487+
assert.Equal(t, s1, w.get())
488+
assert.Equal(t, s2, w.get())
489+
w.put(s4) // interleave a put(), item should go to the end
490+
assert.Equal(t, s3, w.get())
491+
assert.Equal(t, s4, w.get())
492+
assert.Empty(t, w)
493+
assert.Nil(t, w.get())
494+
}
495+
422496
func TestExecuteInParallel(t *testing.T) {
423497
q := New(10)
424498
for i := 0; i < 10; i++ {

0 commit comments

Comments
 (0)