Skip to content

Commit 9bf2ced

Browse files
Merge pull request #139 from mikeatlas/queue-peek
added queue.Peek() functionality
2 parents 93a32f6 + 89f0067 commit 9bf2ced

File tree

3 files changed

+58
-0
lines changed

3 files changed

+58
-0
lines changed

queue/error.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,8 @@ var (
2525

2626
// ErrTimeout is returned when an applicable queue operation times out.
2727
ErrTimeout = errors.New(`queue: poll timed out`)
28+
29+
// ErrEmptyQueue is returned when an non-applicable queue operation was called
30+
// due to the queue's empty item state
31+
ErrEmptyQueue = errors.New(`queue: empty queue`)
2832
)

queue/queue.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,16 @@ func (items *items) get(number int64) []interface{} {
9595
return returnItems
9696
}
9797

98+
func (items *items) peek() (interface{}, bool) {
99+
length := len(*items)
100+
101+
if length == 0 {
102+
return nil, false
103+
}
104+
105+
return (*items)[0], true
106+
}
107+
98108
func (items *items) getUntil(checker func(item interface{}) bool) []interface{} {
99109
length := len(*items)
100110

@@ -239,6 +249,24 @@ func (q *Queue) Poll(number int64, timeout time.Duration) ([]interface{}, error)
239249
return items, nil
240250
}
241251

252+
// Peek returns a the first item in the queue by value
253+
// without modifying the queue.
254+
func (q *Queue) Peek() (interface{}, error) {
255+
q.lock.Lock()
256+
defer q.lock.Unlock()
257+
258+
if q.disposed {
259+
return nil, ErrDisposed
260+
}
261+
262+
peekItem, ok := q.items.peek()
263+
if !ok {
264+
return nil, ErrEmptyQueue
265+
}
266+
267+
return peekItem, nil
268+
}
269+
242270
// TakeUntil takes a function and returns a list of items that
243271
// match the checker until the checker returns false. This does not
244272
// wait if there are no items in the queue.

queue/queue_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,32 @@ func BenchmarkChannel(b *testing.B) {
329329
wg.Wait()
330330
}
331331

332+
func TestPeek(t *testing.T) {
333+
q := New(10)
334+
q.Put(`a`)
335+
q.Put(`b`)
336+
q.Put(`c`)
337+
peekResult, err := q.Peek()
338+
peekExpected := `a`
339+
assert.Nil(t, err)
340+
assert.Equal(t, q.Len(), int64(3))
341+
assert.Equal(t, peekExpected, peekResult)
342+
343+
popResult, err := q.Get(1)
344+
assert.Nil(t, err)
345+
assert.Equal(t, peekResult, popResult[0])
346+
assert.Equal(t, q.Len(), int64(2))
347+
}
348+
349+
func TestPeekOnDisposedQueue(t *testing.T) {
350+
q := New(10)
351+
q.Dispose()
352+
result, err := q.Peek()
353+
354+
assert.Nil(t, result)
355+
assert.IsType(t, ErrDisposed, err)
356+
}
357+
332358
func TestTakeUntil(t *testing.T) {
333359
q := New(10)
334360
q.Put(`a`, `b`, `c`)

0 commit comments

Comments
 (0)