Skip to content

Commit d0874b2

Browse files
committed
Played around with a retry queue
1 parent 2e39663 commit d0874b2

File tree

4 files changed

+184
-1
lines changed

4 files changed

+184
-1
lines changed

queue/README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,4 @@ A single ended queue with push and pop functionality implemented with channels f
44

55
Because this queue is type specific, it should probably not be imported, and instead simply implemented in a project that uses this. Copy the implementation and retype as needed. For this reason, the struct is implemented as private.
66

7-
There are optional add-ons for this implementation such as a queue counter to prevent the queue from being cycled through excessively.
7+
There are optional add-ons for this implementation such as a queue counter to prevent the queue from being cycled through excessively. An example implementation is included in `run.go`.

retry/README.md

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# retry
2+
3+
Implements a retry queue with an input channel and internal buffered channel. The retry queue will continually attempt to retry a given action on the elements of that queue, until both no more input has been recieved for a number of seconds and the length of the queue is unchanged for at least one cycle through the queue.
4+
5+
Because this queue is type specific, it should probably not be imported, and instead simply implemented in a project that uses this. Copy the implementation and retype as needed. For this reason, the struct is implemented as private and typed as integer.

retry/retry.go

+119
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
package retry
2+
3+
import (
4+
"fmt"
5+
"sync"
6+
"time"
7+
)
8+
9+
// Specifies an element of the retry queue. The element must have methods that process
10+
// and record failure to process. The processing method will return a boolean representing
11+
// whether processing should be tried again.
12+
type Element interface {
13+
// Process this element. Return true if the process shoud be tried again.
14+
Process() bool
15+
// Record failure to process.
16+
Fail()
17+
}
18+
19+
// Retry is a queue with an automatic retry capability. Elements added to the queue are
20+
// processed and retried until the input is closed and
21+
type Retry struct {
22+
input chan Element
23+
retry chan Element
24+
wait sync.WaitGroup
25+
}
26+
27+
// Build an empty retry queue
28+
func New(capacity uint) *Retry {
29+
r := &Retry{
30+
input: make(chan Element),
31+
retry: make(chan Element, capacity),
32+
}
33+
r.run()
34+
return r
35+
}
36+
37+
// Add a new element to the retry queue
38+
func (r *Retry) Push(e Element) {
39+
r.input <- e
40+
}
41+
42+
// Close channel, indicating that no further input will be passed
43+
func (r *Retry) Close() {
44+
close(r.input)
45+
}
46+
47+
func (r *Retry) Wait() {
48+
r.wait.Wait()
49+
}
50+
51+
// run starts the processing of elements
52+
func (r *Retry) run() {
53+
54+
inputTimeout := make(chan bool, 1)
55+
56+
// input goroutine
57+
go func() {
58+
for elem := range r.input {
59+
tryagain := elem.Process()
60+
if tryagain {
61+
r.retry <- elem
62+
}
63+
}
64+
inputTimeout <- true
65+
}()
66+
67+
// retry goroutine
68+
go func() {
69+
70+
r.wait.Add(1)
71+
72+
cntr := 0
73+
rlen := 0
74+
inputDone := false
75+
retryDone := false
76+
77+
for elem := range r.retry {
78+
79+
// if done, process all remaining elements
80+
if retryDone {
81+
elem.Fail()
82+
continue
83+
}
84+
85+
// process current element
86+
tryagain := elem.Process()
87+
if tryagain {
88+
r.retry <- elem
89+
}
90+
91+
// if the input channel is closed, start monitoring for shutdown conditions
92+
select {
93+
case <-inputTimeout:
94+
inputDone = true
95+
rlen = len(r.retry)
96+
case <-time.After(1 * time.Second):
97+
continue
98+
default:
99+
}
100+
101+
// if input is done, loop through all elements once then shutdown
102+
if inputDone {
103+
fmt.Printf("Input closed, loop %d\n", cntr)
104+
cntr = cntr + 1
105+
106+
if cntr > rlen {
107+
retryDone = true
108+
close(r.retry)
109+
}
110+
111+
}
112+
113+
}
114+
115+
r.wait.Done()
116+
117+
}()
118+
119+
}

retry/retry_test.go

+59
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package retry
2+
3+
import (
4+
"fmt"
5+
"math/rand"
6+
"testing"
7+
"time"
8+
)
9+
10+
type example struct {
11+
i int
12+
}
13+
14+
func (e *example) Process() bool {
15+
if rand.Intn(3) < 1 {
16+
fmt.Println("Fail on", e.i)
17+
return true
18+
}
19+
fmt.Println("Success on", e.i)
20+
return false
21+
}
22+
23+
func (e *example) Fail() {
24+
fmt.Println("Abandoning", e.i)
25+
}
26+
27+
func TestRetry(t *testing.T) {
28+
29+
fmt.Println("Starting test")
30+
31+
rand.Seed(time.Now().UnixNano())
32+
33+
// setup
34+
35+
capacity := uint(10)
36+
37+
genelem := func() *example {
38+
return &example{rand.Int()}
39+
}
40+
41+
count := 100
42+
43+
// execution
44+
45+
r := New(capacity)
46+
47+
for i := 0; i < count; i++ {
48+
e := genelem()
49+
fmt.Println("New element", e.i)
50+
r.Push(e)
51+
}
52+
53+
r.Close()
54+
55+
fmt.Println("Waiting")
56+
57+
r.Wait()
58+
59+
}

0 commit comments

Comments
 (0)