Skip to content

Commit 870ccfc

Browse files
committed
Add a way to drop operation
Client code executing Processor.Run might want to abort running operation if the operation was not yet run in a batch. Such situation is possible when there is a high request congestion. From now on Processor.Run will accept new parameter context.Context. This context could be cancelled by the client effectively dropping the operation if it was still waiting to be run. Example: ``` ctx := context.WithTimeout(context.Background(), 5 * time.Second) err := processor.Run(ctx, "key", ...) // err will be OperationCancelled ```
1 parent 0b2e530 commit 870ccfc

File tree

7 files changed

+71
-27
lines changed

7 files changed

+71
-27
lines changed

README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@ processor := batch.StartProcessor(
6060
)
6161

6262
// And use the processor inside http/grpc handler or technology-agnostic service.
63-
// ResourceKey can be taken from request parameter.
64-
err := processor.Run(resourceKey, func(r *YourResource) {
63+
// ctx is a standard context.Context and resourceKey can be taken from request parameter
64+
err := processor.Run(ctx, resourceKey, func(r *YourResource) {
6565
// Here you put the code which will executed sequentially inside batch
6666
})
6767
```

_example/http/http.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package http
55

66
import (
7+
"context"
78
"errors"
89
"fmt"
910
"net/http"
@@ -13,7 +14,7 @@ import (
1314
)
1415

1516
type TrainService interface {
16-
Book(train string, seatNumber int, person string) error
17+
Book(ctx context.Context, train string, seatNumber int, person string) error
1718
}
1819

1920
func ListenAndServe(trainService TrainService) error {
@@ -41,7 +42,7 @@ func bookHandler(trainService TrainService) func(http.ResponseWriter, *http.Requ
4142
return
4243
}
4344

44-
err = trainService.Book(trainKey, seat, person)
45+
err = trainService.Book(request.Context(), trainKey, seat, person)
4546

4647
if errors.Is(err, train.ErrValidation("")) {
4748
writer.WriteHeader(http.StatusBadRequest)
@@ -51,7 +52,7 @@ func bookHandler(trainService TrainService) func(http.ResponseWriter, *http.Requ
5152

5253
if err != nil {
5354
writer.WriteHeader(http.StatusInternalServerError)
54-
fmt.Println("internal server error", err)
55+
fmt.Println("internal server error:", err)
5556
return
5657
}
5758

_example/train/service.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,21 @@
33

44
package train
55

6+
import "context"
7+
68
// BatchProcessor is an optional interface to decouple your code from `batch` package.
79
type BatchProcessor interface {
8-
Run(key string, operation func(*Train)) error
10+
Run(ctx context.Context, key string, operation func(*Train)) error
911
}
1012

1113
type Service struct {
1214
BatchProcessor BatchProcessor
1315
}
1416

15-
func (s Service) Book(trainName string, seatNumber int, person string) error {
17+
func (s Service) Book(ctx context.Context, trainName string, seatNumber int, person string) error {
1618
var operationError error
1719

18-
batchError := s.BatchProcessor.Run(trainName, func(train *Train) {
20+
batchError := s.BatchProcessor.Run(ctx, trainName, func(train *Train) {
1921
operationError = train.Book(seatNumber, person)
2022
})
2123

batch.go

+12-4
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ type Options[Resource any] struct {
3232
// SaveResource saves resource with given key to a database. Returning an error aborts the batch.
3333
// This function is called at the end of each batch. Context passed as a first parameter
3434
// has a timeout calculated using batch MaxDuration. You can use this information to abort saving resource
35-
// if it takes too long.
35+
// if it takes too long (thus aborting the entire batch).
3636
//
3737
// By default, does nothing.
3838
SaveResource func(_ context.Context, key string, _ Resource) error
@@ -139,7 +139,10 @@ func (s Options[Resource]) withDefaults() Options[Resource] {
139139
// Run ends when the entire batch has ended. It returns error when batch is aborted or processor is stopped.
140140
// Only LoadResource and SaveResource functions can abort the batch by returning an error. If error was reported
141141
// for a batch all Run calls assigned to this batch will get this error.
142-
func (p *Processor[Resource]) Run(key string, _operation func(Resource)) error {
142+
//
143+
// Operation which is still waiting to be run can be canceled by cancelling ctx. If operation was executed but batch
144+
// is pending then Run waits until batch ends. When ctx is cancelled then OperationCancelled error is returned.
145+
func (p *Processor[Resource]) Run(ctx context.Context, key string, _operation func(Resource)) error {
143146
select {
144147
case <-p.stopped:
145148
return ProcessorStopped
@@ -151,13 +154,18 @@ func (p *Processor[Resource]) Run(key string, _operation func(Resource)) error {
151154

152155
goRoutineNumber := p.options.GoRoutineNumberForKey(key, p.options.GoRoutines)
153156

154-
p.workerChannels[goRoutineNumber] <- operation[Resource]{
157+
o := operation[Resource]{
155158
resourceKey: key,
156159
run: _operation,
157160
result: result,
158161
}
159162

160-
return <-result
163+
select {
164+
case p.workerChannels[goRoutineNumber] <- o:
165+
return <-result
166+
case <-ctx.Done():
167+
return OperationCancelled
168+
}
161169
}
162170

163171
// Stop ends all running batches. No new operations will be accepted.

batch_bench_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package batch_test
22

33
import (
4+
"context"
45
"strconv"
56
"sync"
67
"testing"
@@ -31,7 +32,7 @@ func BenchmarkProcessor_Run(b *testing.B) {
3132
key := strconv.Itoa(i % resourceCount)
3233
go func() {
3334
// when
34-
err := processor.Run(key, operation)
35+
err := processor.Run(context.Background(), key, operation)
3536
require.NoError(b, err)
3637
allOperationsFinished.Done()
3738
}()

batch_test.go

+45-14
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,16 @@ import (
1818
"github.com/elgopher/batch"
1919
)
2020

21+
var noTimeout = context.Background()
22+
2123
func TestProcessor_Run(t *testing.T) {
2224
t.Run("should run callback on zero-value resource when LoadResource was not provided", func(t *testing.T) {
2325
futureValue := FutureValue[*resource]()
2426

2527
processor := batch.StartProcessor(batch.Options[*resource]{})
2628
defer processor.Stop()
2729
// when
28-
err := processor.Run("key", func(c *resource) {
30+
err := processor.Run(noTimeout, "key", func(c *resource) {
2931
futureValue.Set(c)
3032
})
3133
require.NoError(t, err)
@@ -47,7 +49,7 @@ func TestProcessor_Run(t *testing.T) {
4749
})
4850
defer processor.Stop()
4951
// when
50-
err := processor.Run(key, func(r *resource) {
52+
err := processor.Run(noTimeout, key, func(r *resource) {
5153
futureValue.Set(r)
5254
})
5355
require.NoError(t, err)
@@ -68,7 +70,7 @@ func TestProcessor_Run(t *testing.T) {
6870
})
6971
defer processor.Stop()
7072
// when
71-
err := processor.Run(key, func(r *resource) {
73+
err := processor.Run(noTimeout, key, func(r *resource) {
7274
r.value = 2
7375
})
7476
require.NoError(t, err)
@@ -90,7 +92,7 @@ func TestProcessor_Run(t *testing.T) {
9092

9193
started := time.Now()
9294
// when
93-
err := processor.Run("", func(empty) {})
95+
err := processor.Run(noTimeout, "", func(empty) {})
9496
require.NoError(t, err)
9597

9698
elapsed := time.Now().Sub(started)
@@ -103,7 +105,7 @@ func TestProcessor_Run(t *testing.T) {
103105

104106
started := time.Now()
105107
// when
106-
err := processor.Run("", func(empty) {})
108+
err := processor.Run(noTimeout, "", func(empty) {})
107109
require.NoError(t, err)
108110

109111
elapsed := time.Now().Sub(started)
@@ -126,12 +128,12 @@ func TestProcessor_Run(t *testing.T) {
126128

127129
key := ""
128130

129-
err := processor.Run(key, func(empty) {
131+
err := processor.Run(noTimeout, key, func(empty) {
130132
time.Sleep(100 * time.Millisecond)
131133
})
132134
require.NoError(t, err)
133135

134-
err = processor.Run(key, func(empty) {})
136+
err = processor.Run(noTimeout, key, func(empty) {})
135137
require.NoError(t, err)
136138

137139
assert.Equal(t, int32(2), atomic.LoadInt32(&batchCount))
@@ -154,7 +156,7 @@ func TestProcessor_Run(t *testing.T) {
154156
)
155157
defer processor.Stop()
156158
// when
157-
err := processor.Run(key, func(*resource) {})
159+
err := processor.Run(noTimeout, key, func(*resource) {})
158160
// then
159161
assert.ErrorIs(t, err, customError)
160162
// and
@@ -181,7 +183,7 @@ func TestProcessor_Run(t *testing.T) {
181183
)
182184
defer processor.Stop()
183185
// when
184-
err := processor.Run(key, func(*resource) {})
186+
err := processor.Run(noTimeout, key, func(*resource) {})
185187
// then
186188
assert.ErrorIs(t, err, timeoutError)
187189
// and
@@ -206,7 +208,7 @@ func TestProcessor_Run(t *testing.T) {
206208
)
207209
defer processor.Stop()
208210
// when
209-
err := processor.Run(key, func(empty) {})
211+
err := processor.Run(noTimeout, key, func(empty) {})
210212
// then
211213
assert.ErrorIs(t, err, timeoutError)
212214
})
@@ -229,7 +231,7 @@ func TestProcessor_Run(t *testing.T) {
229231

230232
for i := 0; i < iterations; i++ {
231233
go func() {
232-
err := processor.Run("key", func(r *resource) {
234+
err := processor.Run(noTimeout, "key", func(r *resource) {
233235
r.value++ // value is not guarded so data race should be reported by `go test`
234236
})
235237
require.NoError(t, err)
@@ -240,14 +242,43 @@ func TestProcessor_Run(t *testing.T) {
240242

241243
group.Wait()
242244
})
245+
246+
t.Run("should cancel operation if operation is still waiting to be run", func(t *testing.T) {
247+
processor := batch.StartProcessor(batch.Options[empty]{})
248+
defer processor.Stop()
249+
250+
var slowOperationStarted sync.WaitGroup
251+
slowOperationStarted.Add(1)
252+
253+
var slowOperationStopped sync.WaitGroup
254+
slowOperationStopped.Add(1)
255+
256+
key := "key"
257+
258+
go processor.Run(noTimeout, key, func(empty) {
259+
slowOperationStarted.Done()
260+
slowOperationStopped.Wait()
261+
})
262+
263+
slowOperationStarted.Wait()
264+
265+
ctx, cancel := context.WithCancel(context.Background())
266+
// when
267+
cancel()
268+
err := processor.Run(ctx, key, func(empty) {})
269+
// then
270+
assert.ErrorIs(t, err, batch.OperationCancelled)
271+
// cleanup
272+
slowOperationStopped.Done()
273+
})
243274
}
244275

245276
func TestProcessor_Stop(t *testing.T) {
246277
t.Run("after Stop no new operation can be run", func(t *testing.T) {
247278
processor := batch.StartProcessor(batch.Options[empty]{})
248279
processor.Stop()
249280

250-
err := processor.Run("key", func(empty) {})
281+
err := processor.Run(noTimeout, "key", func(empty) {})
251282
assert.ErrorIs(t, err, batch.ProcessorStopped)
252283
})
253284

@@ -269,7 +300,7 @@ func TestProcessor_Stop(t *testing.T) {
269300
started := time.Now()
270301

271302
go func() {
272-
err := processor.Run("key", func(empty) {
303+
err := processor.Run(noTimeout, "key", func(empty) {
273304
operationExecuted.Done()
274305
})
275306
require.NoError(t, err)
@@ -301,7 +332,7 @@ func TestProcessor_Stop(t *testing.T) {
301332
},
302333
)
303334
go func() {
304-
_ = processor.Run("key", func(empty) {
335+
_ = processor.Run(noTimeout, "key", func(empty) {
305336
operationExecuted.Done()
306337
})
307338
}()

error.go

+1
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,4 @@ package batch
66
import "errors"
77

88
var ProcessorStopped = errors.New("run failed: processor is stopped")
9+
var OperationCancelled = errors.New("run failed: operation was canceled before it was run")

0 commit comments

Comments
 (0)