Skip to content

Commit 166b653

Browse files
committed
Reuse context.Context for the entire batch
This is needed to support database transactions. After LoadResource succeeded the context cannot be canceled, because in the database driver there might be a running go-routine dedicated for transaction which will automatically roll back the transaction once context is canceled.
1 parent a17ed95 commit 166b653

File tree

2 files changed

+52
-13
lines changed

2 files changed

+52
-13
lines changed

batch_test.go

+42
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,48 @@ func TestProcessor_Run(t *testing.T) {
271271
// cleanup
272272
slowOperationStopped.Done()
273273
})
274+
275+
t.Run("should use same context for LoadResource and SaveResource", func(t *testing.T) {
276+
loadResourceContext := FutureValue[context.Context]()
277+
saveResourceContext := FutureValue[context.Context]()
278+
279+
processor := batch.StartProcessor(batch.Options[empty]{
280+
LoadResource: func(ctx context.Context, key string) (empty, error) {
281+
loadResourceContext.Set(ctx)
282+
return empty{}, nil
283+
},
284+
SaveResource: func(ctx context.Context, key string, r empty) error {
285+
saveResourceContext.Set(ctx)
286+
return nil
287+
},
288+
MinDuration: time.Millisecond,
289+
})
290+
// when
291+
_ = processor.Run(context.Background(), "key", func(empty) {})
292+
// then
293+
assert.Same(t, loadResourceContext.Get(t), saveResourceContext.Get(t))
294+
})
295+
296+
t.Run("should cancel context passed to LoadResource once Run finished", func(t *testing.T) {
297+
loadResourceContext := FutureValue[context.Context]()
298+
299+
processor := batch.StartProcessor(batch.Options[empty]{
300+
LoadResource: func(ctx context.Context, key string) (empty, error) {
301+
loadResourceContext.Set(ctx)
302+
return empty{}, nil
303+
},
304+
MinDuration: time.Millisecond,
305+
MaxDuration: time.Minute,
306+
})
307+
// when
308+
_ = processor.Run(context.Background(), "key", func(empty) {})
309+
// then
310+
select {
311+
case <-loadResourceContext.Get(t).Done():
312+
case <-time.After(time.Second):
313+
assert.Fail(t, "Timeout waiting for canceling the context")
314+
}
315+
})
274316
}
275317

276318
func TestProcessor_Stop(t *testing.T) {

goroutine.go

+10-13
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,24 @@ type batch[Resource any] struct {
2121
}
2222

2323
func (b *batch[Resource]) process() {
24+
hardDeadlineContext, cancel := context.WithDeadline(context.Background(), b.hardDeadline)
25+
defer cancel()
26+
2427
softDeadlineReached := time.NewTimer(b.softDeadline.Sub(time.Now()))
2528
defer softDeadlineReached.Stop()
2629

2730
for {
2831
select {
2932
case <-b.stopped:
30-
b.end()
33+
b.end(hardDeadlineContext)
3134
return
3235

3336
case <-softDeadlineReached.C:
34-
b.end()
37+
b.end(hardDeadlineContext)
3538
return
3639

3740
case _operation := <-b.incomingOperations:
38-
err := b.load()
41+
err := b.load(hardDeadlineContext)
3942
if err != nil {
4043
_operation.result <- err
4144
return
@@ -47,36 +50,30 @@ func (b *batch[Resource]) process() {
4750
}
4851
}
4952

50-
func (b *batch[Resource]) end() {
53+
func (b *batch[Resource]) end(ctx context.Context) {
5154
if b.resource == nil {
5255
return
5356
}
5457

55-
err := b.save()
58+
err := b.save(ctx)
5659
for _, result := range b.results {
5760
result <- err
5861
}
5962
}
6063

61-
func (b *batch[Resource]) save() error {
62-
ctx, cancel := context.WithDeadline(context.Background(), b.hardDeadline)
63-
defer cancel()
64-
64+
func (b *batch[Resource]) save(ctx context.Context) error {
6565
if err := b.SaveResource(ctx, b.resourceKey, *b.resource); err != nil {
6666
return err
6767
}
6868

6969
return nil
7070
}
7171

72-
func (b *batch[Resource]) load() error {
72+
func (b *batch[Resource]) load(ctx context.Context) error {
7373
if b.alreadyLoaded() {
7474
return nil
7575
}
7676

77-
ctx, cancel := context.WithDeadline(context.Background(), b.hardDeadline)
78-
defer cancel()
79-
8077
resource, err := b.LoadResource(ctx, b.resourceKey)
8178
if err != nil {
8279
return err

0 commit comments

Comments
 (0)