Skip to content

Commit c32ff62

Browse files
Merge pull request #137 from funny-falcon/selectable_future
futures/Selectable - future with selectable completion channel
2 parents 9bf2ced + f7efbd6 commit c32ff62

File tree

2 files changed

+193
-0
lines changed

2 files changed

+193
-0
lines changed

futures/selectable.go

+105
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
Copyright 2016 Workiva, LLC
3+
Copyright 2016 Sokolov Yura aka funny_falcon
4+
5+
Licensed under the Apache License, Version 2.0 (the "License");
6+
you may not use this file except in compliance with the License.
7+
You may obtain a copy of the License at
8+
9+
http://www.apache.org/licenses/LICENSE-2.0
10+
11+
Unless required by applicable law or agreed to in writing, software
12+
distributed under the License is distributed on an "AS IS" BASIS,
13+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
See the License for the specific language governing permissions and
15+
limitations under the License.
16+
*/
17+
18+
package futures
19+
20+
import (
21+
"errors"
22+
"sync"
23+
"sync/atomic"
24+
)
25+
26+
// ErrFutureCanceled signals that futures in canceled by a call to `f.Cancel()`
27+
var ErrFutureCanceled = errors.New("future canceled")
28+
29+
// Selectable is a future with channel exposed for external `select`.
30+
// Many simultaneous listeners may wait for result either with `f.Value()`
31+
// or by selecting/fetching from `f.WaitChan()`, which is closed when future
32+
// fulfilled.
33+
type Selectable struct {
34+
m sync.Mutex
35+
val interface{}
36+
err error
37+
wait chan struct{}
38+
filled uint32
39+
}
40+
41+
// NewSelectable returns new selectable future.
42+
func NewSelectable() *Selectable {
43+
return &Selectable{wait: make(chan struct{})}
44+
}
45+
46+
func (f *Selectable) wchan() <-chan struct{} {
47+
f.m.Lock()
48+
ch := f.wait
49+
f.m.Unlock()
50+
return ch
51+
}
52+
53+
// WaitChan returns channel, which is closed when future is fullfilled.
54+
func (f *Selectable) WaitChan() <-chan struct{} {
55+
if atomic.LoadUint32(&f.filled) == 1 {
56+
return closed
57+
}
58+
return f.wchan()
59+
}
60+
61+
// GetResult waits for future to be fullfilled and returns value or error,
62+
// whatever is set first
63+
func (f *Selectable) GetResult() (interface{}, error) {
64+
if atomic.LoadUint32(&f.filled) == 0 {
65+
<-f.wchan()
66+
}
67+
return f.val, f.err
68+
}
69+
70+
// Fill sets value for future, if it were not already fullfilled
71+
// Returns error, if it were already set to future.
72+
func (f *Selectable) Fill(v interface{}, e error) error {
73+
f.m.Lock()
74+
if f.filled == 0 {
75+
f.val = v
76+
f.err = e
77+
atomic.StoreUint32(&f.filled, 1)
78+
w := f.wait
79+
f.wait = closed
80+
close(w)
81+
}
82+
f.m.Unlock()
83+
return f.err
84+
}
85+
86+
// SetValue is alias for Fill(v, nil)
87+
func (f *Selectable) SetValue(v interface{}) error {
88+
return f.Fill(v, nil)
89+
}
90+
91+
// SetError is alias for Fill(nil, e)
92+
func (f *Selectable) SetError(e error) {
93+
f.Fill(nil, e)
94+
}
95+
96+
// Cancel is alias for SetError(ErrFutureCanceled)
97+
func (f *Selectable) Cancel() {
98+
f.SetError(ErrFutureCanceled)
99+
}
100+
101+
var closed = make(chan struct{})
102+
103+
func init() {
104+
close(closed)
105+
}

futures/selectable_test.go

+88
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
Copyright 2016 Workiva, LLC
3+
Copyright 2016 Sokolov Yura aka funny_falcon
4+
5+
Licensed under the Apache License, Version 2.0 (the "License");
6+
you may not use this file except in compliance with the License.
7+
You may obtain a copy of the License at
8+
9+
http://www.apache.org/licenses/LICENSE-2.0
10+
11+
Unless required by applicable law or agreed to in writing, software
12+
distributed under the License is distributed on an "AS IS" BASIS,
13+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
See the License for the specific language governing permissions and
15+
limitations under the License.
16+
*/
17+
18+
package futures
19+
20+
import (
21+
"fmt"
22+
"sync"
23+
"testing"
24+
"time"
25+
26+
"github.com/stretchr/testify/assert"
27+
)
28+
29+
func TestSelectableGetResult(t *testing.T) {
30+
f := NewSelectable()
31+
var result interface{}
32+
var err error
33+
var wg sync.WaitGroup
34+
wg.Add(1)
35+
go func() {
36+
result, err = f.GetResult()
37+
wg.Done()
38+
}()
39+
40+
f.SetValue(`test`)
41+
wg.Wait()
42+
43+
assert.Nil(t, err)
44+
assert.Equal(t, `test`, result)
45+
46+
// ensure we don't get paused on the next iteration.
47+
result, err = f.GetResult()
48+
49+
assert.Equal(t, `test`, result)
50+
assert.Nil(t, err)
51+
}
52+
53+
func TestSelectableSetError(t *testing.T) {
54+
f := NewSelectable()
55+
select {
56+
case <-f.WaitChan():
57+
case <-time.After(0):
58+
f.SetError(fmt.Errorf("timeout"))
59+
}
60+
61+
result, err := f.GetResult()
62+
63+
assert.Nil(t, result)
64+
assert.NotNil(t, err)
65+
}
66+
67+
func BenchmarkSelectable(b *testing.B) {
68+
timeout := time.After(30 * time.Minute)
69+
var wg sync.WaitGroup
70+
71+
b.ResetTimer()
72+
73+
for i := 0; i < b.N; i++ {
74+
wg.Add(1)
75+
f := NewSelectable()
76+
go func() {
77+
select {
78+
case <-f.WaitChan():
79+
case <-timeout:
80+
f.SetError(fmt.Errorf("timeout"))
81+
}
82+
wg.Done()
83+
}()
84+
85+
f.SetValue(`test`)
86+
wg.Wait()
87+
}
88+
}

0 commit comments

Comments
 (0)