Skip to content

Commit ada08b4

Browse files
committed
pool: fix pool.Connect if a server i/o hangs
Previously, `pool.Connect` attempted to establish a connection one after another instance. It could cause the entire chain to hang if one connection hanged. Now connections are established in parallel. After the first successful connection, the remaining connections wait with a timeout of `pool.Opts.CheckTimeout`. Closes #TNTP-2018
1 parent 07ea1d1 commit ada08b4

File tree

3 files changed

+123
-58
lines changed

3 files changed

+123
-58
lines changed

CHANGELOG.md

+8
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,18 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
1212

1313
### Changed
1414

15+
- Previously, `pool.Connect` attempted to establish a connection one after
16+
another instance. It could cause the entire chain to hang if one connection
17+
hanged. Now connections are established in parallel. After the first
18+
successful connection, the remaining connections wait with a timeout of
19+
`pool.Opts.CheckTimeout` (#444).
20+
1521
### Fixed
1622

1723
- Connect() may not cancel Dial() call on context expiration if network
1824
connection hangs (#443).
25+
- pool.Connect() failed to connect to any instance if a first instance
26+
connection hangs (#444).
1927

2028
## [v2.3.1] - 2025-04-03
2129

pool/connection_pool.go

+59-41
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ func ConnectWithOpts(ctx context.Context, instances []Instance,
171171
roPool := newRoundRobinStrategy(size)
172172
anyPool := newRoundRobinStrategy(size)
173173

174-
connPool := &ConnectionPool{
174+
p := &ConnectionPool{
175175
ends: make(map[string]*endpoint),
176176
opts: opts,
177177
state: connectedState,
@@ -181,19 +181,44 @@ func ConnectWithOpts(ctx context.Context, instances []Instance,
181181
anyPool: anyPool,
182182
}
183183

184-
canceled := connPool.fillPools(ctx, instances)
185-
if canceled {
186-
connPool.state.set(closedState)
187-
return nil, ErrContextCanceled
184+
fillCtx, fillCancel := context.WithCancel(ctx)
185+
defer fillCancel()
186+
187+
var timeout <-chan time.Time
188+
189+
timeout = make(chan time.Time)
190+
filled := p.fillPools(fillCtx, instances)
191+
done := 0
192+
success := len(instances) == 0
193+
194+
for done < len(instances) {
195+
select {
196+
case <-timeout:
197+
fillCancel()
198+
// To be sure that the branch is called only once.
199+
timeout = make(chan time.Time)
200+
case err := <-filled:
201+
done++
202+
203+
if err == nil && !success {
204+
timeout = time.After(opts.CheckTimeout)
205+
success = true
206+
}
207+
}
208+
}
209+
210+
if !success && ctx.Err() != nil {
211+
p.state.set(closedState)
212+
return nil, ctx.Err()
188213
}
189214

190-
for _, endpoint := range connPool.ends {
215+
for _, endpoint := range p.ends {
191216
endpointCtx, cancel := context.WithCancel(context.Background())
192217
endpoint.cancel = cancel
193-
go connPool.controller(endpointCtx, endpoint)
218+
go p.controller(endpointCtx, endpoint)
194219
}
195220

196-
return connPool, nil
221+
return p, nil
197222
}
198223

199224
// Connect creates pool for instances with specified instances. Instances must
@@ -1184,45 +1209,33 @@ func (p *ConnectionPool) handlerDeactivated(name string, conn *tarantool.Connect
11841209
}
11851210
}
11861211

1187-
func (p *ConnectionPool) deactivateConnection(name string,
1188-
conn *tarantool.Connection, role Role) {
1189-
p.deleteConnection(name)
1190-
conn.Close()
1191-
p.handlerDeactivated(name, conn, role)
1192-
}
1212+
func (p *ConnectionPool) fillPools(ctx context.Context, instances []Instance) <-chan error {
1213+
done := make(chan error, len(instances))
11931214

1194-
func (p *ConnectionPool) deactivateConnections() {
1195-
for name, endpoint := range p.ends {
1196-
if endpoint != nil && endpoint.conn != nil {
1197-
p.deactivateConnection(name, endpoint.conn, endpoint.role)
1198-
}
1199-
}
1200-
}
1201-
1202-
func (p *ConnectionPool) fillPools(ctx context.Context, instances []Instance) bool {
12031215
// It is called before controller() goroutines, so we don't expect
12041216
// concurrency issues here.
12051217
for _, instance := range instances {
12061218
end := newEndpoint(instance.Name, instance.Dialer, instance.Opts)
12071219
p.ends[instance.Name] = end
1220+
}
12081221

1209-
if err := p.tryConnect(ctx, end); err != nil {
1210-
log.Printf("tarantool: connect to %s failed: %s\n",
1211-
instance.Name, err)
1212-
select {
1213-
case <-ctx.Done():
1214-
p.ends[instance.Name] = nil
1215-
log.Printf("tarantool: operation was canceled")
1222+
for _, instance := range instances {
1223+
name := instance.Name
1224+
end := p.ends[name]
12161225

1217-
p.deactivateConnections()
1226+
go func() {
1227+
if err := p.tryConnect(ctx, end); err != nil {
1228+
log.Printf("tarantool: connect to %s failed: %s\n", name, err)
1229+
done <- fmt.Errorf("failed to connect to %s :%w", name, err)
12181230

1219-
return true
1220-
default:
1231+
return
12211232
}
1222-
}
1233+
1234+
done <- nil
1235+
}()
12231236
}
12241237

1225-
return false
1238+
return done
12261239
}
12271240

12281241
func (p *ConnectionPool) updateConnection(e *endpoint) {
@@ -1284,19 +1297,24 @@ func (p *ConnectionPool) updateConnection(e *endpoint) {
12841297
}
12851298

12861299
func (p *ConnectionPool) tryConnect(ctx context.Context, e *endpoint) error {
1300+
e.conn = nil
1301+
e.role = UnknownRole
1302+
1303+
connOpts := e.opts
1304+
connOpts.Notify = e.notify
1305+
conn, err := tarantool.Connect(ctx, e.dialer, connOpts)
1306+
12871307
p.poolsMutex.Lock()
12881308

12891309
if p.state.get() != connectedState {
1310+
if err == nil {
1311+
conn.Close()
1312+
}
1313+
12901314
p.poolsMutex.Unlock()
12911315
return ErrClosed
12921316
}
12931317

1294-
e.conn = nil
1295-
e.role = UnknownRole
1296-
1297-
connOpts := e.opts
1298-
connOpts.Notify = e.notify
1299-
conn, err := tarantool.Connect(ctx, e.dialer, connOpts)
13001318
if err == nil {
13011319
role, err := p.getConnectionRole(conn)
13021320
p.poolsMutex.Unlock()

pool/connection_pool_test.go

+56-17
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"context"
66
"fmt"
77
"log"
8+
"net"
89
"os"
910
"reflect"
1011
"strings"
@@ -141,7 +142,7 @@ func TestConnSuccessfully(t *testing.T) {
141142
}
142143

143144
err = test_helpers.CheckPoolStatuses(args)
144-
require.Nil(t, err)
145+
require.NoError(t, err)
145146
}
146147

147148
func TestConn_no_execute_supported(t *testing.T) {
@@ -261,6 +262,51 @@ func TestConnect_unavailable(t *testing.T) {
261262
}, connPool.GetInfo())
262263
}
263264

265+
func TestConnect_single_server_hang(t *testing.T) {
266+
l, err := net.Listen("tcp", "127.0.0.1:0")
267+
require.NoError(t, err)
268+
defer l.Close()
269+
270+
ctx, cancel := test_helpers.GetPoolConnectContext()
271+
defer cancel()
272+
273+
insts := makeInstances([]string{l.Addr().String()}, connOpts)
274+
275+
connPool, err := pool.Connect(ctx, insts)
276+
if connPool != nil {
277+
defer connPool.Close()
278+
}
279+
280+
require.ErrorIs(t, err, context.DeadlineExceeded)
281+
require.Nil(t, connPool)
282+
}
283+
284+
func TestConnect_server_hang(t *testing.T) {
285+
l, err := net.Listen("tcp", "127.0.0.1:0")
286+
require.NoError(t, err)
287+
defer l.Close()
288+
289+
ctx, cancel := test_helpers.GetPoolConnectContext()
290+
defer cancel()
291+
292+
servers := []string{l.Addr().String(), servers[0]}
293+
insts := makeInstances(servers, connOpts)
294+
295+
connPool, err := pool.Connect(ctx, insts)
296+
if connPool != nil {
297+
defer connPool.Close()
298+
}
299+
300+
require.NoError(t, err, "failed to create a pool")
301+
require.NotNil(t, connPool, "pool is nil after Connect")
302+
require.Equal(t, map[string]pool.ConnectionInfo{
303+
servers[0]: pool.ConnectionInfo{
304+
ConnectedNow: false, ConnRole: pool.UnknownRole, Instance: insts[0]},
305+
servers[1]: pool.ConnectionInfo{
306+
ConnectedNow: true, ConnRole: pool.MasterRole, Instance: insts[1]},
307+
}, connPool.GetInfo())
308+
}
309+
264310
func TestConnErrorAfterCtxCancel(t *testing.T) {
265311
var connLongReconnectOpts = tarantool.Opts{
266312
Timeout: 5 * time.Second,
@@ -279,15 +325,14 @@ func TestConnErrorAfterCtxCancel(t *testing.T) {
279325
if connPool != nil || err == nil {
280326
t.Fatalf("ConnectionPool was created after cancel")
281327
}
282-
if !strings.Contains(err.Error(), "operation was canceled") {
328+
if !strings.Contains(err.Error(), "context canceled") {
283329
t.Fatalf("Unexpected error, expected to contain %s, got %v",
284330
"operation was canceled", err)
285331
}
286332
}
287333

288334
type mockClosingDialer struct {
289335
addr string
290-
cnt *int
291336
ctx context.Context
292337
ctxCancel context.CancelFunc
293338
}
@@ -301,26 +346,21 @@ func (m *mockClosingDialer) Dial(ctx context.Context,
301346
}
302347
conn, err := dialer.Dial(m.ctx, tarantool.DialOpts{})
303348

304-
if *m.cnt == 0 {
305-
m.ctxCancel()
306-
}
307-
*m.cnt++
349+
m.ctxCancel()
308350

309351
return conn, err
310352
}
311353

312-
func TestContextCancelInProgress(t *testing.T) {
354+
func TestConnectContextCancelAfterConnect(t *testing.T) {
313355
ctx, cancel := context.WithCancel(context.Background())
314356
defer cancel()
315357

316-
cnt := new(int)
317358
var instances []pool.Instance
318359
for _, server := range servers {
319360
instances = append(instances, pool.Instance{
320361
Name: server,
321362
Dialer: &mockClosingDialer{
322363
addr: server,
323-
cnt: cnt,
324364
ctx: ctx,
325365
ctxCancel: cancel,
326366
},
@@ -329,11 +369,10 @@ func TestContextCancelInProgress(t *testing.T) {
329369
}
330370

331371
connPool, err := pool.Connect(ctx, instances)
332-
require.NotNilf(t, err, "expected err after ctx cancel")
333-
assert.Truef(t, strings.Contains(err.Error(), "operation was canceled"),
334-
fmt.Sprintf("unexpected error, expected to contain %s, got %v",
335-
"operation was canceled", err))
336-
require.Nilf(t, connPool, "conn is not nil after ctx cancel")
372+
assert.NoError(t, err, "expected err after ctx cancel")
373+
assert.NotNil(t, connPool)
374+
375+
connPool.Close()
337376
}
338377

339378
func TestConnSuccessfullyDuplicates(t *testing.T) {
@@ -527,8 +566,8 @@ func TestAdd(t *testing.T) {
527566
ctx, cancel := test_helpers.GetPoolConnectContext()
528567
defer cancel()
529568
connPool, err := pool.Connect(ctx, []pool.Instance{})
530-
require.Nilf(t, err, "failed to connect")
531-
require.NotNilf(t, connPool, "conn is nil after Connect")
569+
require.NoError(t, err, "failed to connect")
570+
require.NotNil(t, connPool, "conn is nil after Connect")
532571

533572
defer connPool.Close()
534573

0 commit comments

Comments
 (0)