Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use buffered channel to manage items in query pool #1320

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion internal/pool/defaults.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package pool

const DefaultLimit = 50
const (
DefaultLimit = 50
)

var defaultTrace = &Trace{
OnNew: func(info *NewStartInfo) func(info *NewDoneInfo) {
Expand Down
5 changes: 1 addition & 4 deletions internal/pool/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,4 @@ import (
"errors"
)

var (
errClosedPool = errors.New("closed pool")
errItemIsNotAlive = errors.New("item is not alive")
)
var errClosedPool = errors.New("closed pool")
244 changes: 155 additions & 89 deletions internal/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pool

import (
"context"
"sync"
"time"

"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -38,12 +39,32 @@ type (
createTimeout time.Duration
closeTimeout time.Duration

mu xsync.Mutex
idle []PT
index map[PT]struct{}
done chan struct{}
// queue is a buffered channel that holds ready-to-use items.
// Newly created items are sent to this channel by spawner goroutine.
// getItem reads from this channel to get items for usage.
// putItems sends item to this channel when it's no longer needed.
// Len of the buffered channel should be equal to configured pool size
// (MUST NOT be less).
// If item is in this queue, then it's considered idle (not in use).
queue chan PT

// itemTokens similarly to 'queue' is a buffered channel, and it holds 'tokens'.
// Presence of token in this channel indicates that there's requests to create item.
// Every token will eventually result in creation of new item (spawnItems makes sure of that).
//
// itemTokens must have same size as queue.
// Sum of every existing token plus sum of every existing item in any time MUST be equal
// to pool size. New token MUST be added by getItem/putItem if they discovered item in use to be
// no good and discarded it.
itemTokens chan struct{}

done chan struct{}

stats *safeStats

spawnCancel context.CancelFunc

wg *sync.WaitGroup
}
option[PT Item[T], T any] func(p *Pool[PT, T])
)
Expand Down Expand Up @@ -159,6 +180,15 @@ func New[PT Item[T], T any](
}
}

p.queue = make(chan PT, p.limit)
p.itemTokens = make(chan struct{}, p.limit)
go func() {
// fill tokens
for i := 0; i < p.limit; i++ {
p.itemTokens <- struct{}{}
}
}()

onDone := p.trace.OnNew(&NewStartInfo{
Context: &ctx,
Call: stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/pool.New"),
Expand All @@ -172,16 +202,73 @@ func New[PT Item[T], T any](

p.createItem = createItemWithTimeoutHandling(p.createItem, p)

p.idle = make([]PT, 0, p.limit)
p.index = make(map[PT]struct{}, p.limit)
p.stats = &safeStats{
v: stats.Stats{Limit: p.limit},
onChange: p.trace.OnChange,
}

var spawnCtx context.Context
p.wg = &sync.WaitGroup{}
spawnCtx, p.spawnCancel = xcontext.WithCancel(xcontext.ValueOnly(ctx))
p.wg.Add(1)
go p.spawnItems(spawnCtx)

return p
}

// spawnItems creates one item per each available itemToken and sends new item to internal item queue.
// It ensures that pool would always have amount of connections equal to configured limit.
// If item creation ended with error it will be retried infinity with configured interval until success.
func (p *Pool[PT, T]) spawnItems(ctx context.Context) {
defer p.wg.Done()
for {
select {
case <-ctx.Done():
return
case <-p.done:
return
case <-p.itemTokens:
// got token, must create item
createLoop:
for {
select {
case <-ctx.Done():
return
case <-p.done:
return
default:
p.wg.Add(1)
err := p.trySpawn(ctx)
if err == nil {
break createLoop
}
}
// spawn was unsuccessful, need to try again.
// token must always result in new item and not be lost.
}
}
}
}

func (p *Pool[PT, T]) trySpawn(ctx context.Context) error {
defer p.wg.Done()
item, err := p.createItem(ctx)
if err != nil {
return err
}
// item was created successfully, put it in queue
select {
case <-ctx.Done():
return nil
case <-p.done:
return nil
case p.queue <- item:
p.stats.Idle().Inc()
}

return nil
}

// defaultCreateItem returns a new item
func defaultCreateItem[T any, PT Item[T]](ctx context.Context) (PT, error) {
var item T
Expand Down Expand Up @@ -247,31 +334,12 @@ func createItemWithContext[PT Item[T], T any](
return xerrors.WithStackTrace(err)
}

needCloseItem := true
defer func() {
if needCloseItem {
_ = p.closeItem(ctx, newItem)
}
}()

select {
case <-p.done:
return xerrors.WithStackTrace(errClosedPool)
case <-ctx.Done():
p.mu.Lock()
defer p.mu.Unlock()

if len(p.index) < p.limit {
p.idle = append(p.idle, newItem)
p.index[newItem] = struct{}{}
p.stats.Index().Inc()
needCloseItem = false
}

return xerrors.WithStackTrace(ctx.Err())
case ch <- newItem:
needCloseItem = false

return nil
}
}
Expand All @@ -280,6 +348,10 @@ func (p *Pool[PT, T]) Stats() stats.Stats {
return p.stats.Get()
}

// getItem retrieves item from the queue.
// If retrieved item happens to be not alive, then it's destroyed
// and tokens queue is filled to +1 so new item can be created by spawner goroutine.
// After, the process will be repeated until alive item is retrieved.
func (p *Pool[PT, T]) getItem(ctx context.Context) (_ PT, finalErr error) {
onDone := p.trace.OnGet(&GetStartInfo{
Context: &ctx,
Expand All @@ -295,48 +367,30 @@ func (p *Pool[PT, T]) getItem(ctx context.Context) (_ PT, finalErr error) {
return nil, xerrors.WithStackTrace(err)
}

select {
case <-p.done:
return nil, xerrors.WithStackTrace(errClosedPool)
case <-ctx.Done():
return nil, xerrors.WithStackTrace(ctx.Err())
default:
var item PT
p.mu.WithLock(func() {
if len(p.idle) > 0 {
item, p.idle = p.idle[0], p.idle[1:]
p.stats.Idle().Dec()
}
})

if item != nil {
if item.IsAlive() {
return item, nil
}
_ = p.closeItem(ctx, item)
p.mu.WithLock(func() {
delete(p.index, item)
})
p.stats.Index().Dec()
}

item, err := p.createItem(ctx)
if err != nil {
return nil, xerrors.WithStackTrace(err)
}
// get item and ensure it's alive.
// Infinite loop here guarantees that we either return alive item
// or block infinitely until we have one.
// It is assumed that calling code should use context if it wishes to time out the call.
for {
select {
case <-p.done:
return nil, xerrors.WithStackTrace(errClosedPool)
case <-ctx.Done():
return nil, xerrors.WithStackTrace(ctx.Err())
case item := <-p.queue: // get or wait for item
p.stats.Idle().Dec()
if item != nil {
if item.IsAlive() {
// item is alive, return it

addedToIndex := false
p.mu.WithLock(func() {
if len(p.index) < p.limit {
p.index[item] = struct{}{}
addedToIndex = true
return item, nil
}
// item is not alive
_ = p.closeItem(ctx, item) // clean up dead item
}
})
if addedToIndex {
p.stats.Index().Inc()
p.itemTokens <- struct{}{} // signal spawn goroutine to create a new item
// and try again
}

return item, nil
}
}

Expand All @@ -358,25 +412,28 @@ func (p *Pool[PT, T]) putItem(ctx context.Context, item PT) (finalErr error) {
select {
case <-p.done:
return xerrors.WithStackTrace(errClosedPool)
case <-ctx.Done():
return xerrors.WithStackTrace(ctx.Err())
default:
if !item.IsAlive() {
if item.IsAlive() {
// put back in the queue
select {
case <-p.done:
return xerrors.WithStackTrace(errClosedPool)
case <-ctx.Done():
return xerrors.WithStackTrace(ctx.Err())
case p.queue <- item:
p.stats.Idle().Inc()
}
} else {
// item is not alive
// add token and close
p.itemTokens <- struct{}{}
_ = p.closeItem(ctx, item)

p.mu.WithLock(func() {
delete(p.index, item)
})
p.stats.Index().Dec()

return xerrors.WithStackTrace(errItemIsNotAlive)
}

p.mu.WithLock(func() {
p.idle = append(p.idle, item)
})
p.stats.Idle().Inc()

return nil
}

return nil
}

func (p *Pool[PT, T]) closeItem(ctx context.Context, item PT) error {
Expand Down Expand Up @@ -412,14 +469,13 @@ func (p *Pool[PT, T]) try(ctx context.Context, f func(ctx context.Context, item

return xerrors.WithStackTrace(err)
}
p.stats.InUse().Inc()

defer func() {
_ = p.putItem(ctx, item)
p.stats.InUse().Dec()
}()

p.stats.InUse().Inc()
defer p.stats.InUse().Dec()

err = f(ctx, item)
if err != nil {
return xerrors.WithStackTrace(err)
Expand Down Expand Up @@ -479,17 +535,27 @@ func (p *Pool[PT, T]) Close(ctx context.Context) (finalErr error) {
})
}()

// canceling spawner (and any underlying createItem calls)
p.spawnCancel()

// Only closing done channel.
// Due to multiple senders queue is not closed here,
// we're just making sure to drain it fully to close any existing item.
close(p.done)

p.mu.Lock()
defer p.mu.Unlock()
p.wg.Wait()

var g errgroup.Group
for item := range p.index {
item := item
g.Go(func() error {
return item.Close(ctx)
})
shutdownLoop:
for {
select {
case item := <-p.queue:
g.Go(func() error {
return item.Close(ctx)
})
default:
break shutdownLoop
}
}
if err := g.Wait(); err != nil {
return xerrors.WithStackTrace(err)
Expand Down
Loading
Loading