Skip to content

Commit 88eeba8

Browse files
committed
added event based scheduling
1 parent fd5cc8b commit 88eeba8

File tree

4 files changed

+81
-23
lines changed

4 files changed

+81
-23
lines changed

new.go

+31-3
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,38 @@ import (
77
"golang.org/x/sync/errgroup"
88
)
99

10-
// NewDeferredErrGroup provides a new instance of Scheduler and a contexct derived from the input context.
11-
func NewDeferredErrGroup(ctx context.Context, dur time.Duration) (Scheduler, context.Context) {
10+
// New creates a new scheduler based on an input context and trigger mode.
11+
// Scheduler will cancel all pending jobs if the input context is cancelled.
12+
// Scheduler will trigger jobs based on trigger.
13+
// It returns a context that should be used in each function.
14+
// This output context is cancelled on first error in any of the scheduled jobs.
15+
func New(ctx context.Context, trig Trigger) (Scheduler, context.Context) {
1216
s := new(deferredErrGroupScheduler)
1317
eg, ctx := errgroup.WithContext(ctx)
14-
s.eg, s.dur, s.m = eg, dur, make(map[string]context.CancelFunc)
18+
s.eg, s.trig, s.m = eg, trig, make(map[string]context.CancelFunc)
1519
return s, ctx
1620
}
21+
22+
// NewTimeoutTrigger provides a new trigger based on a timeout.
23+
func NewTimeoutTrigger(dur time.Duration) Trigger {
24+
return Trigger{value: dur}
25+
}
26+
27+
// NewContextTrigger provides a new trigger based on a context.
28+
// Trigger fires when input context context is done.
29+
func NewContextTrigger(ctx context.Context) Trigger {
30+
return Trigger{value: ctx}
31+
}
32+
33+
// NewChannelTrigger provides a new trigger based on a channel.
34+
// Trigger fires when input chan is read.
35+
func NewChannelTrigger(c chan struct{}) Trigger {
36+
ctx, cancel := context.WithCancel(context.Background())
37+
go func() {
38+
select {
39+
case <-c:
40+
cancel()
41+
}
42+
}()
43+
return Trigger{value: ctx}
44+
}

deferredEg.go scheduler.go

+19-15
Original file line numberDiff line numberDiff line change
@@ -2,25 +2,11 @@ package scheduler
22

33
import (
44
"context"
5-
"sync"
65
"time"
76

87
"github.com/google/uuid"
9-
"golang.org/x/sync/errgroup"
108
)
119

12-
// deferredErrGroupScheduler implements Scheduler interface providing deferred execution for a func.
13-
type deferredErrGroupScheduler struct {
14-
// dur is time duration to wait before executing func.
15-
dur time.Duration
16-
// m is map of cancel func that can be accessed using keys.
17-
m map[string]context.CancelFunc
18-
// mu is a lock for modifying above map.
19-
mu sync.Mutex
20-
// eg is error group
21-
eg *errgroup.Group
22-
}
23-
2410
// Func implements Scheduler interface to provide a functional literal for use in errgroup.
2511
// Such func will execute the input func deferred in time.
2612
func (d *deferredErrGroupScheduler) Go(ctx context.Context,
@@ -36,10 +22,28 @@ func (d *deferredErrGroupScheduler) Go(ctx context.Context,
3622
d.m[key] = cancelFunc
3723
d.mu.Unlock()
3824

25+
var trig context.Context
26+
var trigCancelFunc context.CancelFunc
27+
28+
switch v := d.trig.value.(type) {
29+
case time.Duration:
30+
// create a context based on timeout
31+
trig, trigCancelFunc = context.WithTimeout(context.Background(), v)
32+
// spawn a go-routine that will call cancel func once triggered
33+
go func() {
34+
select {
35+
case <-trig.Done():
36+
trigCancelFunc()
37+
}
38+
}()
39+
case context.Context:
40+
trig = v
41+
}
42+
3943
// spawn func within an error group.
4044
d.eg.Go(func() error {
4145
select {
42-
case <-time.After(d.dur):
46+
case <-trig.Done():
4347
err := f()
4448
d.Cancel(key)
4549
return err

deferredEg_test.go scheduler_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ func Test_Exec_NormalBehavior(t *testing.T) {
1717

1818
// get a new deferredErrGroupScheduler with a 1 second timeout.
1919
// functions will be executed after one second delay.
20-
group, ctx := NewDeferredErrGroup(ctx, time.Second)
20+
group, ctx := New(ctx, NewTimeoutTrigger(time.Second))
2121

2222
// defer execute func setting value of execTime to be 1.
2323
group.Go(ctx, func() error { execTime = time.Now(); return nil })
@@ -46,7 +46,7 @@ func Test_Exec_GloabalTimeout(t *testing.T) {
4646

4747
// get a new deferredErrGroupScheduler with a 2 second timeout.
4848
// functions will be executed after one second delay.
49-
group, ctx := NewDeferredErrGroup(ctx, 2*time.Second)
49+
group, ctx := New(ctx, NewTimeoutTrigger(2*time.Second))
5050

5151
// defer execute func.
5252
group.Go(ctx, func() error { c = 1; return nil })
@@ -78,7 +78,7 @@ func Test_Exec_GlobalCancel(t *testing.T) {
7878

7979
// get a new deferredErrGroupScheduler with a 1 second timeout.
8080
// functions will be executed after one second delay.
81-
group, ctx := NewDeferredErrGroup(ctx, time.Second)
81+
group, ctx := New(ctx, NewTimeoutTrigger(time.Second))
8282

8383
// defer execute func that sets value of c to 1.
8484
group.Go(ctx, func() error { c = 1; return nil })
@@ -115,7 +115,7 @@ func Test_Exec_CancelOneFunc(t *testing.T) {
115115

116116
// get a new deferredErrGroupScheduler with a 1 second timeout.
117117
// functions will be executed after one second delay.
118-
group, ctx := NewDeferredErrGroup(ctx, time.Second)
118+
group, ctx := New(ctx, NewTimeoutTrigger(time.Second))
119119

120120
// defer execute func setting value of c to be 1.
121121
key := group.Go(ctx, func() error { c[0] = 1; return nil })
@@ -154,7 +154,7 @@ func Test_Exec_OneFuncErrorsOut(t *testing.T) {
154154

155155
// get a new deferredErrGroupScheduler with a 1 second timeout.
156156
// functions will be executed after one second delay.
157-
group, ctx := NewDeferredErrGroup(ctx, time.Second)
157+
group, ctx := New(ctx, NewTimeoutTrigger(time.Second))
158158

159159
// defer execute func setting value of c to be 1.
160160
group.Go(ctx, func() error { c[0] = 1; return fmt.Errorf("to err is human") })

types.go

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package scheduler
2+
3+
import (
4+
"context"
5+
"sync"
6+
7+
"golang.org/x/sync/errgroup"
8+
)
9+
10+
type Trigger struct {
11+
value interface{}
12+
}
13+
14+
// deferredErrGroupScheduler implements Scheduler interface providing deferred execution for a func.
15+
type deferredErrGroupScheduler struct {
16+
// trig is trigger indicating how user wants function to be triggered.
17+
// trigger is either a timeout or an event defined by cancellation of a context.
18+
// Use NewTimeoutTrigger or NewEvent to create options in New method.
19+
trig Trigger
20+
// m is map of cancel func that can be accessed using keys.
21+
m map[string]context.CancelFunc
22+
// mu is a lock for modifying above map.
23+
mu sync.Mutex
24+
// eg is error group
25+
eg *errgroup.Group
26+
}

0 commit comments

Comments
 (0)