Skip to content

Commit e64dacd

Browse files
committed
Improve BufferedChannelQueue efficiency
1 parent f11bff5 commit e64dacd

File tree

2 files changed

+29
-14
lines changed

2 files changed

+29
-14
lines changed

queue.go

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -584,7 +584,16 @@ func (q *BufferedChannelQueue) Put(val interface{}) error {
584584
// return ErrQueueIsClosed
585585
// }
586586
//
587-
// return q.blockingQueue.Put(val)
587+
// q.lock.Lock()
588+
// poolCount := q.pool.Count()
589+
//
590+
// // If appearing nothing in the pool
591+
// if poolCount == 0 {
592+
// defer q.lock.Unlock()
593+
// // Try channel
594+
// return q.blockingQueue.Put(val)
595+
// }
596+
// q.lock.Unlock()
588597

589598
return q.Offer(val)
590599
}
@@ -638,23 +647,31 @@ func (q *BufferedChannelQueue) Offer(val interface{}) error {
638647
return ErrQueueIsClosed
639648
}
640649

650+
poolCount := q.pool.Count()
651+
652+
// If appearing nothing in the pool
653+
if poolCount == 0 {
654+
// Try channel
655+
err := q.blockingQueue.Offer(val)
656+
if err == nil {
657+
// Success
658+
return nil
659+
} else if err == ErrQueueIsFull {
660+
// Do nothing and let pool.Offer(val)
661+
} else {
662+
// Other
663+
return err
664+
}
665+
}
666+
641667
// Before +1: >=, After +1: >
642-
if q.pool.Count() >= q.bufferSizeMaximum {
668+
if poolCount >= q.bufferSizeMaximum {
643669
return ErrQueueIsFull
644670
}
645671

646672
q.pool.Offer(val)
647673
q.loadWorkerCh.Offer(1)
648674
return nil
649-
650-
// err := q.blockingQueue.Offer(val)
651-
// if err == ErrQueueIsFull {
652-
// q.pool.Offer(val)
653-
// q.loadWorkerCh.Offer(1)
654-
// return nil
655-
// }
656-
//
657-
// return err
658675
}
659676

660677
// Poll Poll the val(non-blocking)

queue_test.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -275,13 +275,10 @@ func TestNewBufferedChannelQueue(t *testing.T) {
275275

276276
err = queue.Offer(1)
277277
assert.Equal(t, nil, err)
278-
time.Sleep(1 * timeout)
279278
err = queue.Offer(2)
280279
assert.Equal(t, nil, err)
281-
time.Sleep(1 * timeout)
282280
err = queue.Offer(3)
283281
assert.Equal(t, nil, err)
284-
time.Sleep(1 * timeout)
285282
// Channel: only 3 positions & Buffer: 1 position, now `4` is inserted into the buffer(buffer size: 1)
286283
err = queue.Offer(4)
287284
assert.Equal(t, nil, err)
@@ -319,6 +316,7 @@ func TestNewBufferedChannelQueue(t *testing.T) {
319316
go func() {
320317
for i := 1; i <= 10000; i++ {
321318
// err := bufferedChannelQueue.PutWithTimeout(i, timeout)
319+
// err := bufferedChannelQueue.Put(i)
322320
err := bufferedChannelQueue.Offer(i)
323321
assert.Equal(t, nil, err)
324322
}

0 commit comments

Comments
 (0)