diff --git a/pool/connection_pool.go b/pool/connection_pool.go index a47ec19a..e62cb2b3 100644 --- a/pool/connection_pool.go +++ b/pool/connection_pool.go @@ -1085,7 +1085,12 @@ func (p *ConnectionPool) getConnectionRole(conn *tarantool.Connection) (Role, er return UnknownRole, ErrIncorrectResponse } - instanceStatus, ok := data[0].(map[interface{}]interface{})["status"] + respFields, ok := data[0].(map[interface{}]interface{}) + if !ok { + return UnknownRole, ErrIncorrectResponse + } + + instanceStatus, ok := respFields["status"] if !ok { return UnknownRole, ErrIncorrectResponse } @@ -1093,7 +1098,7 @@ func (p *ConnectionPool) getConnectionRole(conn *tarantool.Connection) (Role, er return UnknownRole, ErrIncorrectStatus } - replicaRole, ok := data[0].(map[interface{}]interface{})[roFieldName] + replicaRole, ok := respFields[roFieldName] if !ok { return UnknownRole, ErrIncorrectResponse } diff --git a/pool/connection_pool_test.go b/pool/connection_pool_test.go index f3bf5f55..8120c613 100644 --- a/pool/connection_pool_test.go +++ b/pool/connection_pool_test.go @@ -1115,7 +1115,10 @@ func TestConnectionHandlerOpenUpdateClose(t *testing.T) { poolInstances := makeInstances(poolServers, connOpts) roles := []bool{false, true} - err := test_helpers.SetClusterRO(makeDialers(poolServers), connOpts, roles) + ctx, cancel := test_helpers.GetPoolConnectContext() + defer cancel() + + err := test_helpers.SetClusterRO(ctx, makeDialers(poolServers), connOpts, roles) require.Nilf(t, err, "fail to set roles for cluster") h := &testHandler{} @@ -1123,8 +1126,6 @@ func TestConnectionHandlerOpenUpdateClose(t *testing.T) { CheckTimeout: 100 * time.Microsecond, ConnectionHandler: h, } - ctx, cancel := test_helpers.GetPoolConnectContext() - defer cancel() connPool, err := pool.ConnectWithOpts(ctx, poolInstances, poolOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -1249,7 +1250,10 @@ func TestConnectionHandlerUpdateError(t *testing.T) { poolInstances := makeInstances(poolServers, connOpts) roles := []bool{false, false} - err := test_helpers.SetClusterRO(makeDialers(poolServers), connOpts, roles) + ctx, cancel := test_helpers.GetPoolConnectContext() + defer cancel() + + err := test_helpers.SetClusterRO(ctx, makeDialers(poolServers), connOpts, roles) require.Nilf(t, err, "fail to set roles for cluster") h := &testUpdateErrorHandler{} @@ -1257,8 +1261,6 @@ func TestConnectionHandlerUpdateError(t *testing.T) { CheckTimeout: 100 * time.Microsecond, ConnectionHandler: h, } - ctx, cancel := test_helpers.GetPoolConnectContext() - defer cancel() connPool, err := pool.ConnectWithOpts(ctx, poolInstances, poolOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -1324,7 +1326,10 @@ func TestConnectionHandlerDeactivated_on_remove(t *testing.T) { poolInstances := makeInstances(poolServers, connOpts) roles := []bool{false, false} - err := test_helpers.SetClusterRO(makeDialers(poolServers), connOpts, roles) + ctx, cancel := test_helpers.GetPoolConnectContext() + defer cancel() + + err := test_helpers.SetClusterRO(ctx, makeDialers(poolServers), connOpts, roles) require.Nilf(t, err, "fail to set roles for cluster") h := &testDeactivatedErrorHandler{} @@ -1332,8 +1337,6 @@ func TestConnectionHandlerDeactivated_on_remove(t *testing.T) { CheckTimeout: 100 * time.Microsecond, ConnectionHandler: h, } - ctx, cancel := test_helpers.GetPoolConnectContext() - defer cancel() connPool, err := pool.ConnectWithOpts(ctx, poolInstances, poolOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -1413,11 +1416,12 @@ func TestRequestOnClosed(t *testing.T) { func TestCall(t *testing.T) { roles := []bool{false, true, false, false, true} - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -1472,11 +1476,12 @@ func TestCall(t *testing.T) { func TestCall16(t *testing.T) { roles := []bool{false, true, false, false, true} - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -1531,11 +1536,12 @@ func TestCall16(t *testing.T) { func TestCall17(t *testing.T) { roles := []bool{false, true, false, false, true} - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -1590,11 +1596,12 @@ func TestCall17(t *testing.T) { func TestEval(t *testing.T) { roles := []bool{false, true, false, false, true} - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -1670,11 +1677,12 @@ func TestExecute(t *testing.T) { roles := []bool{false, true, false, false, true} - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -1728,11 +1736,12 @@ func TestRoundRobinStrategy(t *testing.T) { serversNumber := len(servers) - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -1807,11 +1816,12 @@ func TestRoundRobinStrategy_NoReplica(t *testing.T) { servers[4]: true, } - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -1880,11 +1890,12 @@ func TestRoundRobinStrategy_NoMaster(t *testing.T) { servers[4]: true, } - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -1965,11 +1976,12 @@ func TestUpdateInstancesRoles(t *testing.T) { serversNumber := len(servers) - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -2044,7 +2056,9 @@ func TestUpdateInstancesRoles(t *testing.T) { servers[3]: true, } - err = test_helpers.SetClusterRO(dialers, connOpts, roles) + ctxSetRoles, cancelSetRoles := test_helpers.GetPoolConnectContext() + err = test_helpers.SetClusterRO(ctxSetRoles, dialers, connOpts, roles) + cancelSetRoles() require.Nilf(t, err, "fail to set roles for cluster") // ANY @@ -2111,11 +2125,12 @@ func TestUpdateInstancesRoles(t *testing.T) { func TestInsert(t *testing.T) { roles := []bool{true, true, false, true, true} - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -2210,11 +2225,12 @@ func TestInsert(t *testing.T) { func TestDelete(t *testing.T) { roles := []bool{true, true, false, true, true} - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -2274,11 +2290,12 @@ func TestDelete(t *testing.T) { func TestUpsert(t *testing.T) { roles := []bool{true, true, false, true, true} - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -2346,11 +2363,12 @@ func TestUpsert(t *testing.T) { func TestUpdate(t *testing.T) { roles := []bool{true, true, false, true, true} - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -2435,11 +2453,12 @@ func TestUpdate(t *testing.T) { func TestReplace(t *testing.T) { roles := []bool{true, true, false, true, true} - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -2520,11 +2539,12 @@ func TestReplace(t *testing.T) { func TestSelect(t *testing.T) { roles := []bool{true, true, false, true, false} - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -2543,13 +2563,13 @@ func TestSelect(t *testing.T) { rwKey := []interface{}{"rw_select_key"} anyKey := []interface{}{"any_select_key"} - err = test_helpers.InsertOnInstances(makeDialers(roServers), connOpts, spaceNo, roTpl) + err = test_helpers.InsertOnInstances(ctx, makeDialers(roServers), connOpts, spaceNo, roTpl) require.Nil(t, err) - err = test_helpers.InsertOnInstances(makeDialers(rwServers), connOpts, spaceNo, rwTpl) + err = test_helpers.InsertOnInstances(ctx, makeDialers(rwServers), connOpts, spaceNo, rwTpl) require.Nil(t, err) - err = test_helpers.InsertOnInstances(makeDialers(allServers), connOpts, spaceNo, anyTpl) + err = test_helpers.InsertOnInstances(ctx, makeDialers(allServers), connOpts, spaceNo, anyTpl) require.Nil(t, err) //default: ANY @@ -2642,11 +2662,12 @@ func TestSelect(t *testing.T) { func TestPing(t *testing.T) { roles := []bool{true, true, false, true, false} - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -2681,11 +2702,12 @@ func TestPing(t *testing.T) { func TestDo(t *testing.T) { roles := []bool{true, true, false, true, false} - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -2717,11 +2739,12 @@ func TestDo(t *testing.T) { func TestDo_concurrent(t *testing.T) { roles := []bool{true, true, false, true, false} - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -2766,12 +2789,12 @@ func TestDoInstance(t *testing.T) { func TestDoInstance_not_found(t *testing.T) { roles := []bool{true, true, false, true, false} - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, []pool.Instance{}) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -2820,11 +2843,12 @@ func TestNewPrepared(t *testing.T) { roles := []bool{true, true, false, true, false} - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -2892,11 +2916,12 @@ func TestDoWithStrangerConn(t *testing.T) { roles := []bool{true, true, false, true, false} - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -2922,11 +2947,12 @@ func TestStream_Commit(t *testing.T) { roles := []bool{true, true, false, true, true} - err = test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err = test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -3013,11 +3039,12 @@ func TestStream_Rollback(t *testing.T) { roles := []bool{true, true, false, true, true} - err = test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err = test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -3103,11 +3130,12 @@ func TestStream_TxnIsolationLevel(t *testing.T) { roles := []bool{true, true, false, true, true} - err = test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err = test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -3214,11 +3242,12 @@ func TestConnectionPool_NewWatcher_modes(t *testing.T) { roles := []bool{true, false, false, true, true} - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -3291,14 +3320,15 @@ func TestConnectionPool_NewWatcher_update(t *testing.T) { const initCnt = 2 roles := []bool{true, false, false, true, true} - err := test_helpers.SetClusterRO(dialers, connOpts, roles) + ctx, cancel := test_helpers.GetPoolConnectContext() + defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) require.Nilf(t, err, "fail to set roles for cluster") poolOpts := pool.Opts{ CheckTimeout: 500 * time.Millisecond, } - ctx, cancel := test_helpers.GetPoolConnectContext() - defer cancel() pool, err := pool.ConnectWithOpts(ctx, instances, poolOpts) require.Nilf(t, err, "failed to connect") @@ -3338,7 +3368,9 @@ func TestConnectionPool_NewWatcher_update(t *testing.T) { for i, role := range roles { roles[i] = !role } - err = test_helpers.SetClusterRO(dialers, connOpts, roles) + ctxSetRoles, cancelSetRoles := test_helpers.GetPoolConnectContext() + err = test_helpers.SetClusterRO(ctxSetRoles, dialers, connOpts, roles) + cancelSetRoles() require.Nilf(t, err, "fail to set roles for cluster") // Wait for all updated events. @@ -3376,11 +3408,12 @@ func TestWatcher_Unregister(t *testing.T) { const expectedCnt = 2 roles := []bool{true, false, false, true, true} - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + pool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, pool, "conn is nil after Connect") @@ -3433,11 +3466,12 @@ func TestConnectionPool_NewWatcher_concurrent(t *testing.T) { roles := []bool{true, false, false, true, true} - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -3471,11 +3505,12 @@ func TestWatcher_Unregister_concurrent(t *testing.T) { roles := []bool{true, false, false, true, true} - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") diff --git a/pool/example_test.go b/pool/example_test.go index a4d3d4ba..dce8bb1a 100644 --- a/pool/example_test.go +++ b/pool/example_test.go @@ -23,12 +23,12 @@ var testRoles = []bool{true, true, false, true, true} func examplePool(roles []bool, connOpts tarantool.Opts) (*pool.ConnectionPool, error) { - err := test_helpers.SetClusterRO(dialers, connOpts, roles) + ctx, cancel := test_helpers.GetPoolConnectContext() + defer cancel() + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) if err != nil { return nil, fmt.Errorf("ConnectionPool is not established") } - ctx, cancel := test_helpers.GetPoolConnectContext() - defer cancel() connPool, err := pool.Connect(ctx, instances) if err != nil || connPool == nil { return nil, fmt.Errorf("ConnectionPool is not established") @@ -55,12 +55,12 @@ func exampleFeaturesPool(roles []bool, connOpts tarantool.Opts, }) poolDialers = append(poolDialers, dialer) } - err := test_helpers.SetClusterRO(poolDialers, connOpts, roles) + ctx, cancel := test_helpers.GetPoolConnectContext() + defer cancel() + err := test_helpers.SetClusterRO(ctx, poolDialers, connOpts, roles) if err != nil { return nil, fmt.Errorf("ConnectionPool is not established") } - ctx, cancel := test_helpers.GetPoolConnectContext() - defer cancel() connPool, err := pool.Connect(ctx, poolInstances) if err != nil || connPool == nil { return nil, fmt.Errorf("ConnectionPool is not established") diff --git a/queue/example_connection_pool_test.go b/queue/example_connection_pool_test.go index 355a491e..a126e13a 100644 --- a/queue/example_connection_pool_test.go +++ b/queue/example_connection_pool_test.go @@ -212,7 +212,9 @@ func Example_connectionPool() { // Switch a master instance in the pool. roles := []bool{true, false} for { - err := test_helpers.SetClusterRO(poolDialers, connOpts, roles) + ctx, cancel := test_helpers.GetPoolConnectContext() + err := test_helpers.SetClusterRO(ctx, poolDialers, connOpts, roles) + cancel() if err == nil { break } diff --git a/queue/queue_test.go b/queue/queue_test.go index e43c4711..840c18b4 100644 --- a/queue/queue_test.go +++ b/queue/queue_test.go @@ -954,7 +954,9 @@ func runTestMain(m *testing.M) int { }) } - err = test_helpers.SetClusterRO(dialers, connOpts, roles) + ctx, cancel := test_helpers.GetPoolConnectContext() + err = test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + cancel() if err == nil { break } diff --git a/test_helpers/pool_helper.go b/test_helpers/pool_helper.go index b67d05f0..599e5659 100644 --- a/test_helpers/pool_helper.go +++ b/test_helpers/pool_helper.go @@ -2,8 +2,10 @@ package test_helpers import ( "context" + "errors" "fmt" "reflect" + "sync" "time" "github.com/tarantool/go-tarantool/v2" @@ -164,6 +166,7 @@ func InsertOnInstance(ctx context.Context, dialer tarantool.Dialer, connOpts tar } func InsertOnInstances( + ctx context.Context, dialers []tarantool.Dialer, connOpts tarantool.Opts, space interface{}, @@ -174,21 +177,24 @@ func InsertOnInstances( roles[i] = false } - err := SetClusterRO(dialers, connOpts, roles) + err := SetClusterRO(ctx, dialers, connOpts, roles) if err != nil { return fmt.Errorf("fail to set roles for cluster: %s", err.Error()) } - for _, dialer := range dialers { - ctx, cancel := GetConnectContext() - err := InsertOnInstance(ctx, dialer, connOpts, space, tuple) - cancel() - if err != nil { - return err - } + errs := make([]error, len(dialers)) + var wg sync.WaitGroup + wg.Add(len(dialers)) + for i, dialer := range dialers { + // Pass loop variable(s) to avoid its capturing by reference (not needed since Go 1.22). + go func(i int, dialer tarantool.Dialer) { + defer wg.Done() + errs[i] = InsertOnInstance(ctx, dialer, connOpts, space, tuple) + }(i, dialer) } + wg.Wait() - return nil + return errors.Join(errs...) } func SetInstanceRO(ctx context.Context, dialer tarantool.Dialer, connOpts tarantool.Opts, @@ -206,25 +212,73 @@ func SetInstanceRO(ctx context.Context, dialer tarantool.Dialer, connOpts tarant return err } + checkRole := func(conn *tarantool.Connection, isReplica bool) string { + data, err := conn.Do(tarantool.NewCallRequest("box.info")).Get() + switch { + case err != nil: + return fmt.Sprintf("failed to get box.info: %s", err) + case len(data) < 1: + return "box.info is empty" + } + + boxInfo, ok := data[0].(map[interface{}]interface{}) + if !ok { + return "unexpected type in box.info response" + } + + status, statusFound := boxInfo["status"] + readonly, readonlyFound := boxInfo["ro"] + switch { + case !statusFound: + return "box.info.status is missing" + case status != "running": + return fmt.Sprintf("box.info.status='%s' (waiting for 'running')", status) + case !readonlyFound: + return "box.info.ro is missing" + case readonly != isReplica: + return fmt.Sprintf("box.info.ro='%v' (waiting for '%v')", readonly, isReplica) + default: + return "" + } + } + + problem := "not checked yet" + + // Wait for the role to be applied. + for len(problem) != 0 { + select { + case <-time.After(10 * time.Millisecond): + case <-ctx.Done(): + return fmt.Errorf("%w: failed to apply role, the last problem: %s", + ctx.Err(), problem) + } + + problem = checkRole(conn, isReplica) + } + return nil } -func SetClusterRO(dialers []tarantool.Dialer, connOpts tarantool.Opts, +func SetClusterRO(ctx context.Context, dialers []tarantool.Dialer, connOpts tarantool.Opts, roles []bool) error { if len(dialers) != len(roles) { return fmt.Errorf("number of servers should be equal to number of roles") } + // Apply roles in parallel. + errs := make([]error, len(dialers)) + var wg sync.WaitGroup + wg.Add(len(dialers)) for i, dialer := range dialers { - ctx, cancel := GetConnectContext() - err := SetInstanceRO(ctx, dialer, connOpts, roles[i]) - cancel() - if err != nil { - return err - } + // Pass loop variable(s) to avoid its capturing by reference (not needed since Go 1.22). + go func(i int, dialer tarantool.Dialer) { + defer wg.Done() + errs[i] = SetInstanceRO(ctx, dialer, connOpts, roles[i]) + }(i, dialer) } + wg.Wait() - return nil + return errors.Join(errs...) } func StartTarantoolInstances(instsOpts []StartOpts) ([]*TarantoolInstance, error) {