Skip to content

Commit d7e1264

Browse files
committed
[xcontainer] move DelayQueue to queue
1 parent 9954f3c commit d7e1264

File tree

7 files changed

+95
-97
lines changed

7 files changed

+95
-97
lines changed

go.mod

+1-2
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,11 @@ go 1.13
55
require (
66
github.com/bwmarrin/snowflake v0.3.0
77
github.com/davecgh/go-spew v1.1.1
8-
github.com/go-redsync/redsync v1.4.1
8+
github.com/go-redsync/redsync v1.4.2
99
github.com/gofrs/uuid v3.2.0+incompatible
1010
github.com/gomodule/redigo v2.0.0+incompatible
1111
github.com/google/btree v1.0.0
1212
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
13-
github.com/smartystreets-prototypes/go-disruptor v0.0.0-20200316140655-c96477fd7a6a
1413
github.com/spaolacci/murmur3 v1.1.0
1514
github.com/spf13/afero v1.2.2
1615
github.com/stretchr/testify v1.5.1

go.sum

+2-4
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ github.com/bwmarrin/snowflake v0.3.0/go.mod h1:NdZxfVWX+oR6y2K0o6qAYv6gIOP9rjG0/
33
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
44
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
55
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
6-
github.com/go-redsync/redsync v1.4.1 h1:HsNhNnEF+56PJEKQtn3sji97BK0uqJfskOUenoUsBus=
7-
github.com/go-redsync/redsync v1.4.1/go.mod h1:my8/M5YL986u2jBMtZTLkBIgBsKNNSixJWzWwISH6Uw=
6+
github.com/go-redsync/redsync v1.4.2 h1:KADEZ2rlaHMZWnlkthQCxfGP+8ZWwJLiSjOYN3mntKA=
7+
github.com/go-redsync/redsync v1.4.2/go.mod h1:my8/M5YL986u2jBMtZTLkBIgBsKNNSixJWzWwISH6Uw=
88
github.com/gofrs/uuid v3.2.0+incompatible h1:y12jRkkFxsd7GpqdSZ+/KCs/fJbqpEXSGd4+jfEaewE=
99
github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
1010
github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0=
@@ -22,8 +22,6 @@ github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWb
2222
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
2323
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
2424
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
25-
github.com/smartystreets-prototypes/go-disruptor v0.0.0-20200316140655-c96477fd7a6a h1:mHEYm/fBGwtGwgIW/tlprwUd2syvcue8oosLVuQscic=
26-
github.com/smartystreets-prototypes/go-disruptor v0.0.0-20200316140655-c96477fd7a6a/go.mod h1:slFCjqF2v0VgmCeB+J4uEy0d7HAgLkgEjVrG0DPO67M=
2725
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
2826
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
2927
github.com/spf13/afero v1.2.2 h1:5jhuqJyZCZf2JRofRvN/nIFgIWNzPa3/Vz8mYylgbWc=

iter/ints.go iter/numberic.go

+29
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,32 @@ func (s *intSliceIterator) Next() (interface{}, bool) {
3434
}
3535
return item, true
3636
}
37+
38+
type Int64Slice types.Int64Slice
39+
40+
func (i Int64Slice) Iter() Iterator {
41+
ch := make(chan int64, 1)
42+
go func(ch chan int64) {
43+
for _, x := range i {
44+
ch <- x
45+
}
46+
}(ch)
47+
return &int64SliceIterator{next: ch}
48+
}
49+
50+
type int64SliceIterator struct {
51+
next chan int64
52+
stopped int32
53+
}
54+
55+
func (s *int64SliceIterator) Next() (interface{}, bool) {
56+
if atomic.LoadInt32(&s.stopped) == 1 {
57+
return nil, false
58+
}
59+
item, ok := <-s.next
60+
if !ok {
61+
atomic.StoreInt32(&s.stopped, 1)
62+
return nil, false
63+
}
64+
return item, true
65+
}

xcontainer/queue/block_queue.go

+46
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package queue
2+
3+
import (
4+
"time"
5+
)
6+
7+
type BlockQueue struct {
8+
capacity int
9+
ch chan interface{}
10+
}
11+
12+
func (q *BlockQueue) Capacity() int {
13+
return q.capacity
14+
}
15+
16+
func (q *BlockQueue) Push(x interface{}, timeout time.Duration) bool {
17+
t := time.NewTimer(timeout)
18+
select {
19+
case q.ch <- x:
20+
return true
21+
case <-t.C:
22+
return false
23+
}
24+
}
25+
26+
func (q *BlockQueue) Pop(timeout time.Duration) (interface{}, bool) {
27+
t := time.NewTimer(timeout)
28+
select {
29+
case <-t.C:
30+
return nil, false
31+
case x := <-q.ch:
32+
return x, true
33+
}
34+
}
35+
36+
func (q *BlockQueue) BlockPop() (interface{}, bool) {
37+
val, ok := <-q.ch
38+
return val, ok
39+
}
40+
41+
func NewBlockingQueue(n int) *BlockQueue {
42+
return &BlockQueue{
43+
capacity: n,
44+
ch: make(chan interface{}, n),
45+
}
46+
}

xcontainer/delay_queue/delay_queue.go xcontainer/queue/delay_queue.go

+12-11
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package delay_queue
1+
package queue
22

33
import (
44
"math"
@@ -8,13 +8,13 @@ import (
88
"github.com/go-board/x-go/xcontainer/priority_queue"
99
)
1010

11-
type Task struct {
11+
type DelayTask struct {
1212
data interface{}
1313
at time.Time
1414
}
1515

16-
func (t Task) Compare(o types.Comparable) types.Ordering {
17-
oo := o.(*Task)
16+
func (t DelayTask) Compare(o types.Comparable) types.Ordering {
17+
oo := o.(*DelayTask)
1818
if t.at.Before(oo.at) {
1919
return types.OrderingLess
2020
}
@@ -24,28 +24,28 @@ func (t Task) Compare(o types.Comparable) types.Ordering {
2424
return types.OrderingGreater
2525
}
2626

27-
func NewTask(v interface{}, at time.Time) *Task {
28-
return &Task{data: v, at: at}
27+
func NewTimedTask(v interface{}, at time.Time) *DelayTask {
28+
return &DelayTask{data: v, at: at}
2929
}
3030

31-
func NewDelayTask(v interface{}, d time.Duration) *Task {
32-
return NewTask(v, time.Now().Add(d))
31+
func NewDelayTask(v interface{}, d time.Duration) *DelayTask {
32+
return NewTimedTask(v, time.Now().Add(d))
3333
}
3434

3535
type DelayQueue struct {
3636
q *priority_queue.PriorityQueue
3737
notify chan struct{}
3838
}
3939

40-
func New() *DelayQueue {
40+
func NewDelayQueue() *DelayQueue {
4141
return &DelayQueue{
4242
q: priority_queue.NewComparablePriorityQueue(false),
4343
notify: make(chan struct{}, 1),
4444
}
4545
}
4646

4747
// Push new data into delay queue.
48-
func (q *DelayQueue) Push(task *Task) {
48+
func (q *DelayQueue) Push(task *DelayTask) {
4949
q.q.Push(task)
5050
// this should not block
5151
select {
@@ -65,7 +65,7 @@ func (q *DelayQueue) popNearest() (interface{}, time.Duration, bool) {
6565
if task == nil {
6666
return nil, time.Duration(math.MaxInt64), false
6767
}
68-
t := task.(*Task)
68+
t := task.(*DelayTask)
6969
now := time.Now()
7070
duration := t.at.Sub(now)
7171
if !t.at.After(now) {
@@ -87,6 +87,7 @@ func (q *DelayQueue) BlockPop() interface{} {
8787
}
8888
return v
8989
}
90+
// block until timeout or new element pushed
9091
select {
9192
case <-time.NewTimer(duration).C:
9293
case <-q.notify:

xcontainer/queue/queue_test.go

+5-79
Original file line numberDiff line numberDiff line change
@@ -1,99 +1,25 @@
11
package queue
22

33
import (
4-
"sync/atomic"
54
"testing"
65
"time"
76

8-
"github.com/smartystreets-prototypes/go-disruptor"
97
"github.com/stretchr/testify/require"
108
)
119

1210
func TestBlockingQueue(t *testing.T) {
1311
t.Run("put", func(t *testing.T) {
1412
q := NewBlockingQueue(1)
15-
require.Equal(t, true, q.Put(1, time.Second), "should not block when putting first element")
16-
require.Equal(t, false, q.Put(2, time.Second), "should block when putting second element")
13+
require.Equal(t, true, q.Push(1, time.Second), "should not block when putting first element")
14+
require.Equal(t, false, q.Push(2, time.Second), "should block when putting second element")
1715
})
1816
t.Run("get", func(t *testing.T) {
1917
q := NewBlockingQueue(1)
20-
require.Equal(t, true, q.Put(1, time.Second), "should not block when putting first element")
21-
x, ok := q.Get(time.Second)
18+
require.Equal(t, true, q.Push(1, time.Second), "should not block when putting first element")
19+
x, ok := q.Pop(time.Second)
2220
require.Equal(t, true, ok, "should get first element")
2321
require.Equal(t, 1, x, "first element should be 1")
24-
_, ok = q.Get(time.Second)
22+
_, ok = q.Pop(time.Second)
2523
require.Equal(t, false, ok, "should get nothing when get again")
2624
})
2725
}
28-
29-
type testItem struct {
30-
i int
31-
}
32-
33-
const RingBufferCapacity = 1 << 14 // must be a power of 2
34-
const RingBufferMask = RingBufferCapacity - 1
35-
36-
// this instance will be shared among producers and consumers of this application
37-
var ringBuffer = [RingBufferCapacity]int{}
38-
39-
type disruptorConsumer struct{}
40-
41-
func (d disruptorConsumer) Consume(lower, upper int64) {
42-
for sequence := lower; sequence <= upper; sequence++ {
43-
44-
_ = ringBuffer[sequence&RingBufferMask] // see performance note on producer sample above
45-
46-
// handle the incoming message with your application code
47-
}
48-
}
49-
50-
func BenchmarkDisruptor(b *testing.B) {
51-
b.Run("put", func(b *testing.B) {
52-
writer, _ := disruptor.New(
53-
disruptor.WithCapacity(1<<14),
54-
disruptor.WithConsumerGroup(disruptorConsumer{}))
55-
b.ResetTimer()
56-
b.ReportAllocs()
57-
i := int64(0)
58-
b.RunParallel(func(pb *testing.PB) {
59-
for pb.Next() {
60-
reservation := writer.Reserve(1)
61-
ringBuffer[atomic.AddInt64(&i, 1)&RingBufferMask] = 42 // example of incoming value from a network operation such as HTTP, TCP, UDP, etc.
62-
writer.Commit(reservation, reservation)
63-
}
64-
})
65-
})
66-
}
67-
68-
func BenchmarkChan(b *testing.B) {
69-
b.Run("get", func(b *testing.B) {
70-
b.ReportAllocs()
71-
b.ResetTimer()
72-
ch := make(chan testItem, 10000)
73-
go func() {
74-
for {
75-
ch <- testItem{i: 0}
76-
}
77-
}()
78-
b.RunParallel(func(pb *testing.PB) {
79-
for pb.Next() {
80-
<-ch
81-
}
82-
})
83-
})
84-
b.Run("put", func(b *testing.B) {
85-
b.ReportAllocs()
86-
b.ResetTimer()
87-
ch := make(chan testItem, 10000)
88-
go func() {
89-
for {
90-
<-ch
91-
}
92-
}()
93-
b.RunParallel(func(pb *testing.PB) {
94-
for pb.Next() {
95-
ch <- testItem{i: 0}
96-
}
97-
})
98-
})
99-
}

xslice/find.go

-1
This file was deleted.

0 commit comments

Comments
 (0)