Skip to content

Commit a78bced

Browse files
committed
improve spawner and fix races
1 parent 40cbe4f commit a78bced

File tree

4 files changed

+57
-52
lines changed

4 files changed

+57
-52
lines changed

internal/pool/defaults.go

-14
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,7 @@
11
package pool
22

3-
import "time"
4-
53
const (
64
DefaultLimit = 50
7-
8-
// defaultCreateRetryDelay specifies amount of time that spawner will wait
9-
// before retrying unsuccessful item create.
10-
defaultCreateRetryDelay = 500 * time.Millisecond
11-
12-
// defaultSpawnGoroutinesNumber specifies amount of spawnItems goroutines.
13-
// Having more than one spawner can potentially decrease warm-up time
14-
// and connections re-establishment time after connectivity failure.
15-
// Too high value will result in frequent connection establishment
16-
// attempts (see defaultCreateRetryDelay) during connectivity
17-
// issues which in some cases might not be desirable.
18-
defaultSpawnGoroutinesNumber = 2
195
)
206

217
var defaultTrace = &Trace{

internal/pool/errors.go

+1-4
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,4 @@ import (
44
"errors"
55
)
66

7-
var (
8-
errClosedPool = errors.New("closed pool")
9-
errItemIsNotAlive = errors.New("item is not alive")
10-
)
7+
var errClosedPool = errors.New("closed pool")

internal/pool/pool.go

+53-31
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package pool
22

33
import (
44
"context"
5+
"sync"
56
"time"
67

78
"golang.org/x/sync/errgroup"
@@ -60,6 +61,10 @@ type (
6061
done chan struct{}
6162

6263
stats *safeStats
64+
65+
spawnCancel context.CancelFunc
66+
67+
wg *sync.WaitGroup
6368
}
6469
option[PT Item[T], T any] func(p *Pool[PT, T])
6570
)
@@ -202,9 +207,11 @@ func New[PT Item[T], T any](
202207
onChange: p.trace.OnChange,
203208
}
204209

205-
for i := 0; i < defaultSpawnGoroutinesNumber; i++ {
206-
go p.spawnItems(ctx)
207-
}
210+
var spawnCtx context.Context
211+
p.wg = &sync.WaitGroup{}
212+
spawnCtx, p.spawnCancel = xcontext.WithCancel(xcontext.ValueOnly(ctx))
213+
p.wg.Add(1)
214+
go p.spawnItems(spawnCtx)
208215

209216
return p
210217
}
@@ -213,45 +220,55 @@ func New[PT Item[T], T any](
213220
// It ensures that pool would always have amount of connections equal to configured limit.
214221
// If item creation ended with error it will be retried infinity with configured interval until success.
215222
func (p *Pool[PT, T]) spawnItems(ctx context.Context) {
216-
spawnLoop:
223+
defer p.wg.Done()
217224
for {
218225
select {
219226
case <-ctx.Done():
220-
break spawnLoop
227+
return
221228
case <-p.done:
222-
break spawnLoop
229+
return
223230
case <-p.itemTokens:
224231
// got token, must create item
232+
createLoop:
225233
for {
226-
item, err := p.createItem(ctx)
227-
if err != nil {
228-
select {
229-
case <-ctx.Done():
230-
break spawnLoop
231-
case <-p.done:
232-
break spawnLoop
233-
case <-time.After(defaultCreateRetryDelay):
234-
// try again.
235-
// token must always result in new item and not be lost.
234+
select {
235+
case <-ctx.Done():
236+
return
237+
case <-p.done:
238+
return
239+
default:
240+
p.wg.Add(1)
241+
err := p.trySpawn(ctx)
242+
if err == nil {
243+
break createLoop
236244
}
237-
} else {
238-
// item is created successfully, put it in queue
239-
select {
240-
case <-ctx.Done():
241-
break spawnLoop
242-
case <-p.done:
243-
break spawnLoop
244-
case p.queue <- item:
245-
p.stats.Idle().Inc()
246-
}
247-
248-
continue spawnLoop
249245
}
246+
// spawn was unsuccessful, need to try again.
247+
// token must always result in new item and not be lost.
250248
}
251249
}
252250
}
253251
}
254252

253+
func (p *Pool[PT, T]) trySpawn(ctx context.Context) error {
254+
defer p.wg.Done()
255+
item, err := p.createItem(ctx)
256+
if err != nil {
257+
return err
258+
}
259+
// item was created successfully, put it in queue
260+
select {
261+
case <-ctx.Done():
262+
return nil
263+
case <-p.done:
264+
return nil
265+
case p.queue <- item:
266+
p.stats.Idle().Inc()
267+
}
268+
269+
return nil
270+
}
271+
255272
// defaultCreateItem returns a new item
256273
func defaultCreateItem[T any, PT Item[T]](ctx context.Context) (PT, error) {
257274
var item T
@@ -365,15 +382,13 @@ func (p *Pool[PT, T]) getItem(ctx context.Context) (_ PT, finalErr error) {
365382
if item != nil {
366383
if item.IsAlive() {
367384
// item is alive, return it
368-
p.stats.InUse().Inc()
369385

370386
return item, nil
371387
}
372388
// item is not alive
373389
_ = p.closeItem(ctx, item) // clean up dead item
374390
}
375391
p.itemTokens <- struct{}{} // signal spawn goroutine to create a new item
376-
377392
// and try again
378393
}
379394
}
@@ -400,7 +415,6 @@ func (p *Pool[PT, T]) putItem(ctx context.Context, item PT) (finalErr error) {
400415
case <-ctx.Done():
401416
return xerrors.WithStackTrace(ctx.Err())
402417
default:
403-
p.stats.InUse().Dec()
404418
if item.IsAlive() {
405419
// put back in the queue
406420
select {
@@ -455,9 +469,11 @@ func (p *Pool[PT, T]) try(ctx context.Context, f func(ctx context.Context, item
455469

456470
return xerrors.WithStackTrace(err)
457471
}
472+
p.stats.InUse().Inc()
458473

459474
defer func() {
460475
_ = p.putItem(ctx, item)
476+
p.stats.InUse().Dec()
461477
}()
462478

463479
err = f(ctx, item)
@@ -519,10 +535,16 @@ func (p *Pool[PT, T]) Close(ctx context.Context) (finalErr error) {
519535
})
520536
}()
521537

538+
// canceling spawner (and any underlying createItem calls)
539+
p.spawnCancel()
540+
522541
// Only closing done channel.
523542
// Due to multiple senders queue is not closed here,
524543
// we're just making sure to drain it fully to close any existing item.
525544
close(p.done)
545+
546+
p.wg.Wait()
547+
526548
var g errgroup.Group
527549
shutdownLoop:
528550
for {

internal/pool/pool_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package pool
33
import (
44
"context"
55
"errors"
6-
"fmt"
76
"math/rand"
87
"sync"
98
"sync/atomic"
@@ -68,8 +67,10 @@ func (t *testItemV2) Close(context.Context) error {
6867
}
6968

7069
func (t *testItemV2) failAfter(d time.Duration) {
71-
<-time.After(d)
70+
timer := time.NewTimer(d)
71+
<-timer.C
7272
atomic.CompareAndSwapInt32(&t.dead, 0, 1)
73+
timer.Stop()
7374
}
7475

7576
func TestPool(t *testing.T) {
@@ -339,7 +340,6 @@ func TestPool(t *testing.T) {
339340
atomic.AddInt64(&newItems, 1)
340341
v := &testItem{
341342
onClose: func() error {
342-
fmt.Println("close call")
343343
atomic.AddInt64(&deleteItems, 1)
344344

345345
return nil

0 commit comments

Comments
 (0)