Skip to content

Commit f6bed41

Browse files
author
Neven Miculinic
committed
feat(chpool): Add health-check function
1 parent dbcdccf commit f6bed41

File tree

3 files changed

+67
-18
lines changed

3 files changed

+67
-18
lines changed

chpool/client.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,21 @@ func (c *Client) Release() {
2828
return
2929
}
3030

31-
c.res.Release()
31+
if time.Since(c.res.Value().lastHealthCheckTimestamp) > c.p.options.HealthCheckPeriod && c.p.options.HealthCheckFunc != nil {
32+
go func() {
33+
now := time.Now()
34+
ctx, cancel := context.WithTimeout(context.Background(), c.p.options.HealthCheckTimeout)
35+
defer cancel()
36+
if err := c.p.options.HealthCheckFunc(ctx, c.res.Value().client); err != nil {
37+
c.res.Destroy()
38+
return
39+
}
40+
c.res.Value().lastHealthCheckTimestamp = now
41+
c.res.Release()
42+
}()
43+
} else {
44+
c.res.Release()
45+
}
3246
}
3347

3448
func (c *Client) Do(ctx context.Context, q ch.Query) (err error) {

chpool/conn.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
11
package chpool
22

33
import (
4+
"time"
5+
46
"github.com/jackc/puddle/v2"
57

68
"github.com/ClickHouse/ch-go"
79
)
810

911
type connResource struct {
10-
client *ch.Client
11-
clients []Client
12+
lastHealthCheckTimestamp time.Time
13+
client *ch.Client
14+
clients []Client
1215
}
1316

1417
func (cr *connResource) getConn(p *Pool, res *puddle.Resource[*connResource]) *Client {

chpool/pool.go

Lines changed: 47 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,19 +23,26 @@ type Pool struct {
2323

2424
// Options for Pool.
2525
type Options struct {
26-
ClientOptions ch.Options
27-
MaxConnLifetime time.Duration
28-
MaxConnIdleTime time.Duration
29-
MaxConns int32
30-
MinConns int32
31-
HealthCheckPeriod time.Duration
26+
ClientOptions ch.Options
27+
MaxConnLifetime time.Duration
28+
MaxConnIdleTime time.Duration
29+
MaxConns int32
30+
MinConns int32
31+
HealthCheckPeriod time.Duration
32+
HealthCheckFunc func(ctx context.Context, client *ch.Client) error
33+
HealthCheckTimeout time.Duration
34+
}
35+
36+
func DefaultHealthCheckFunc(ctx context.Context, client *ch.Client) error {
37+
return client.Ping(ctx)
3238
}
3339

3440
// Defaults for pool.
3541
const (
36-
DefaultMaxConnLifetime = time.Hour
37-
DefaultMaxConnIdleTime = time.Minute * 30
38-
DefaultHealthCheckPeriod = time.Minute
42+
DefaultMaxConnLifetime = time.Hour
43+
DefaultMaxConnIdleTime = time.Minute * 30
44+
DefaultHealthCheckPeriod = time.Minute
45+
DefaultHealthCheckTimeout = time.Second
3946
)
4047

4148
func (o *Options) setDefaults() {
@@ -51,6 +58,12 @@ func (o *Options) setDefaults() {
5158
if o.HealthCheckPeriod == 0 {
5259
o.HealthCheckPeriod = DefaultHealthCheckPeriod
5360
}
61+
if o.HealthCheckFunc == nil {
62+
o.HealthCheckFunc = DefaultHealthCheckFunc
63+
}
64+
if o.HealthCheckTimeout == 0 {
65+
o.HealthCheckTimeout = DefaultHealthCheckTimeout
66+
}
5467
}
5568

5669
// Dial returns a pool of connections to ClickHouse.
@@ -163,15 +176,34 @@ func (p *Pool) checkIdleConnsHealth() {
163176
resources := p.pool.AcquireAllIdle()
164177

165178
now := time.Now()
179+
wg := sync.WaitGroup{}
166180
for _, res := range resources {
167-
if now.Sub(res.CreationTime()) > p.options.MaxConnLifetime {
168-
res.Destroy()
169-
} else if res.IdleDuration() > p.options.MaxConnIdleTime {
170-
res.Destroy()
171-
} else {
181+
res := res
182+
wg.Add(1)
183+
go func() {
184+
if now.Sub(res.CreationTime()) > p.options.MaxConnLifetime {
185+
res.Destroy()
186+
return
187+
}
188+
189+
if res.IdleDuration() > p.options.MaxConnIdleTime {
190+
res.Destroy()
191+
return
192+
}
193+
194+
if p.options.HealthCheckFunc != nil {
195+
ctx, cancel := context.WithTimeout(context.Background(), p.options.HealthCheckTimeout)
196+
defer cancel()
197+
if err := p.options.HealthCheckFunc(ctx, res.Value().client); err != nil {
198+
res.Destroy()
199+
return
200+
}
201+
res.Value().lastHealthCheckTimestamp = now
202+
}
172203
res.ReleaseUnused()
173-
}
204+
}()
174205
}
206+
wg.Wait()
175207
}
176208

177209
func (p *Pool) checkMinConns() {

0 commit comments

Comments
 (0)