Skip to content

Commit 10694d0

Browse files
author
Neven Miculinic
committed
feat(chpool): add healthcheck function
1 parent dbcdccf commit 10694d0

File tree

5 files changed

+127
-29
lines changed

5 files changed

+127
-29
lines changed

chpool/client.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package chpool
22

33
import (
44
"context"
5-
"time"
65

76
"github.com/jackc/puddle/v2"
87

@@ -21,14 +20,16 @@ func (c *Client) Release() {
2120
return
2221
}
2322

24-
client := c.client()
25-
26-
if client.IsClosed() || time.Since(c.res.CreationTime()) > c.p.options.MaxConnLifetime {
27-
c.res.Destroy()
28-
return
29-
}
30-
31-
c.res.Release()
23+
// calling async since connIsHealthy may block
24+
go func() {
25+
if c.p.connIsHealthy(c.res) {
26+
c.p.options.ClientOptions.Logger.Debug("chpool: releasing connection")
27+
c.res.Release()
28+
} else {
29+
c.p.options.ClientOptions.Logger.Debug("chpool: destoying connection")
30+
c.res.Destroy()
31+
}
32+
}()
3233
}
3334

3435
func (c *Client) Do(ctx context.Context, q ch.Query) (err error) {

chpool/client_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,13 @@ package chpool
22

33
import (
44
"context"
5+
"sync/atomic"
56
"testing"
67

8+
"github.com/stretchr/testify/assert"
79
"github.com/stretchr/testify/require"
10+
11+
"github.com/ClickHouse/ch-go"
812
)
913

1014
func TestClient_Do(t *testing.T) {
@@ -17,6 +21,24 @@ func TestClient_Do(t *testing.T) {
1721
testDo(t, conn)
1822
}
1923

24+
func TestClient_ReleaseHealthCheck(t *testing.T) {
25+
t.Parallel()
26+
var healthCheckCnt int64
27+
p := PoolConnOpt(t, Options{
28+
HealthCheckFunc: func(ctx context.Context, client *ch.Client) error {
29+
atomic.AddInt64(&healthCheckCnt, 1)
30+
return nil
31+
},
32+
})
33+
conn, err := p.Acquire(context.Background())
34+
require.NoError(t, err)
35+
assert.Equal(t, int64(0), atomic.LoadInt64(&healthCheckCnt))
36+
37+
conn.Release()
38+
waitForReleaseToComplete()
39+
assert.Equal(t, int64(1), atomic.LoadInt64(&healthCheckCnt))
40+
}
41+
2042
func TestClient_Ping(t *testing.T) {
2143
t.Parallel()
2244
p := PoolConn(t)

chpool/conn.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
11
package chpool
22

33
import (
4+
"time"
5+
46
"github.com/jackc/puddle/v2"
57

68
"github.com/ClickHouse/ch-go"
79
)
810

911
type connResource struct {
10-
client *ch.Client
11-
clients []Client
12+
lastHealthCheckTimestamp time.Time
13+
client *ch.Client
14+
clients []Client
1215
}
1316

1417
func (cr *connResource) getConn(p *Pool, res *puddle.Resource[*connResource]) *Client {

chpool/pool.go

Lines changed: 66 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,10 @@ import (
66
"sync"
77
"time"
88

9-
"github.com/ClickHouse/ch-go"
10-
119
"github.com/jackc/puddle/v2"
10+
"go.uber.org/zap"
11+
12+
"github.com/ClickHouse/ch-go"
1213
)
1314

1415
// Pool of connections to ClickHouse.
@@ -23,19 +24,26 @@ type Pool struct {
2324

2425
// Options for Pool.
2526
type Options struct {
26-
ClientOptions ch.Options
27-
MaxConnLifetime time.Duration
28-
MaxConnIdleTime time.Duration
29-
MaxConns int32
30-
MinConns int32
31-
HealthCheckPeriod time.Duration
27+
ClientOptions ch.Options
28+
MaxConnLifetime time.Duration
29+
MaxConnIdleTime time.Duration
30+
MaxConns int32
31+
MinConns int32
32+
HealthCheckPeriod time.Duration
33+
HealthCheckFunc func(ctx context.Context, client *ch.Client) error
34+
HealthCheckTimeout time.Duration
35+
}
36+
37+
func DefaultHealthCheckFunc(ctx context.Context, client *ch.Client) error {
38+
return client.Ping(ctx)
3239
}
3340

3441
// Defaults for pool.
3542
const (
36-
DefaultMaxConnLifetime = time.Hour
37-
DefaultMaxConnIdleTime = time.Minute * 30
38-
DefaultHealthCheckPeriod = time.Minute
43+
DefaultMaxConnLifetime = time.Hour
44+
DefaultMaxConnIdleTime = time.Minute * 30
45+
DefaultHealthCheckPeriod = time.Minute
46+
DefaultHealthCheckTimeout = time.Second
3947
)
4048

4149
func (o *Options) setDefaults() {
@@ -51,6 +59,15 @@ func (o *Options) setDefaults() {
5159
if o.HealthCheckPeriod == 0 {
5260
o.HealthCheckPeriod = DefaultHealthCheckPeriod
5361
}
62+
if o.HealthCheckFunc == nil {
63+
o.HealthCheckFunc = DefaultHealthCheckFunc
64+
}
65+
if o.HealthCheckTimeout == 0 {
66+
o.HealthCheckTimeout = DefaultHealthCheckTimeout
67+
}
68+
if o.ClientOptions.Logger == nil {
69+
o.ClientOptions.Logger = zap.NewNop()
70+
}
5471
}
5572

5673
// Dial returns a pool of connections to ClickHouse.
@@ -162,16 +179,47 @@ func (p *Pool) backgroundHealthCheck() {
162179
func (p *Pool) checkIdleConnsHealth() {
163180
resources := p.pool.AcquireAllIdle()
164181

165-
now := time.Now()
182+
wg := sync.WaitGroup{}
166183
for _, res := range resources {
167-
if now.Sub(res.CreationTime()) > p.options.MaxConnLifetime {
168-
res.Destroy()
169-
} else if res.IdleDuration() > p.options.MaxConnIdleTime {
170-
res.Destroy()
171-
} else {
172-
res.ReleaseUnused()
184+
res := res
185+
wg.Add(1)
186+
go func() {
187+
defer wg.Done()
188+
if res.IdleDuration() > p.options.MaxConnIdleTime || !p.connIsHealthy(res) {
189+
res.Destroy()
190+
} else {
191+
res.ReleaseUnused()
192+
}
193+
}()
194+
wg.Wait()
195+
}
196+
}
197+
198+
func (p *Pool) connIsHealthy(res *puddle.Resource[*connResource]) bool {
199+
logger := p.options.ClientOptions.Logger
200+
if res.Value().client.IsClosed() {
201+
logger.Debug("chpool: connection is closed")
202+
return false
203+
}
204+
205+
if time.Since(res.CreationTime()) > p.options.MaxConnLifetime {
206+
logger.Debug("chpool: connection over max lifetime")
207+
return false
208+
}
209+
210+
if p.options.HealthCheckFunc != nil {
211+
ctx, cancel := context.WithTimeout(context.Background(), p.options.HealthCheckTimeout)
212+
defer cancel()
213+
if err := p.options.HealthCheckFunc(ctx, res.Value().client); err != nil {
214+
if logger := p.options.ClientOptions.Logger; logger != nil {
215+
logger.Warn("chpool: health check failed", zap.Error(err))
216+
}
217+
return false
173218
}
174219
}
220+
221+
res.Value().lastHealthCheckTimestamp = time.Now()
222+
return true
175223
}
176224

177225
func (p *Pool) checkMinConns() {

chpool/pool_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,12 @@ package chpool
22

33
import (
44
"context"
5+
"sync/atomic"
56
"testing"
67
"time"
78

9+
"github.com/ClickHouse/ch-go"
10+
811
"github.com/stretchr/testify/assert"
912
"github.com/stretchr/testify/require"
1013
)
@@ -61,6 +64,7 @@ func TestPool_Ping(t *testing.T) {
6164
p := PoolConn(t)
6265

6366
require.NoError(t, p.Ping(context.Background()))
67+
waitForReleaseToComplete()
6468

6569
stats := p.Stat()
6670
assert.EqualValues(t, 0, stats.AcquiredResources())
@@ -78,3 +82,23 @@ func TestPool_Acquire(t *testing.T) {
7882
waitForReleaseToComplete()
7983
require.EqualValues(t, 2, p.Stat().AcquireCount())
8084
}
85+
86+
func TestPool_backgroundHealthCheck(t *testing.T) {
87+
t.Parallel()
88+
var healthCheckCnt int64
89+
p := PoolConnOpt(t, Options{
90+
MinConns: 1,
91+
HealthCheckFunc: func(ctx context.Context, client *ch.Client) error {
92+
atomic.AddInt64(&healthCheckCnt, 1)
93+
return nil
94+
},
95+
HealthCheckPeriod: 500 * time.Millisecond,
96+
})
97+
p.checkMinConns()
98+
p.checkIdleConnsHealth()
99+
assert.GreaterOrEqual(t, int64(1), atomic.LoadInt64(&healthCheckCnt))
100+
101+
hc := atomic.LoadInt64(&healthCheckCnt)
102+
time.Sleep(750 * time.Millisecond)
103+
assert.Equal(t, hc+1, atomic.LoadInt64(&healthCheckCnt))
104+
}

0 commit comments

Comments
 (0)