Skip to content

Commit 01560f5

Browse files
committed
close dropped connections and ping added connections on balancer.Update()
1 parent 26a84de commit 01560f5

File tree

6 files changed

+201
-27
lines changed

6 files changed

+201
-27
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Pinged new connections on discovery attempt, closed dropped ones, so `ydb_go_sdk_ydb_driver_conns` metric is correct
2+
13
## v3.108.3
24
* Fixed handling of zero values for DyNumber
35
* Fixed the decimal yql slice bounds out of range

internal/balancer/balancer.go

+34-26
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,34 @@ func (b *Balancer) clusterDiscoveryAttempt(ctx context.Context, cc *grpc.ClientC
175175
return nil
176176
}
177177

178+
func buildConnectionsState(ctx context.Context, pool interface {
179+
GetIfPresent(endpoint endpoint.Endpoint) conn.Conn
180+
Allow(ctx context.Context, cc conn.Conn)
181+
EndpointsToConnections(endpoints []endpoint.Endpoint) []conn.Conn
182+
}, newest []endpoint.Endpoint,
183+
dropped []endpoint.Endpoint,
184+
config balancerConfig.Config,
185+
selfLocation balancerConfig.Info,
186+
) *connectionsState {
187+
connections := pool.EndpointsToConnections(newest)
188+
for _, c := range connections {
189+
pool.Allow(ctx, c)
190+
c.Endpoint().Touch()
191+
_ = c.Ping(ctx)
192+
}
193+
194+
state := newConnectionsState(connections, config.Filter, selfLocation, config.AllowFallback)
195+
196+
for _, e := range dropped {
197+
c := pool.GetIfPresent(e)
198+
if c != nil {
199+
_ = c.Close(ctx)
200+
}
201+
}
202+
203+
return state
204+
}
205+
178206
func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, newest []endpoint.Endpoint, localDC string) {
179207
var (
180208
onDone = trace.DriverOnBalancerUpdate(
@@ -186,10 +214,12 @@ func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, newest []endpoi
186214
)
187215
previous = b.connections().All()
188216
)
217+
218+
_, added, dropped := xslices.Diff(previous, newest, func(lhs, rhs endpoint.Endpoint) int {
219+
return strings.Compare(lhs.Address(), rhs.Address())
220+
})
221+
189222
defer func() {
190-
_, added, dropped := xslices.Diff(previous, newest, func(lhs, rhs endpoint.Endpoint) int {
191-
return strings.Compare(lhs.Address(), rhs.Address())
192-
})
193223
onDone(
194224
xslices.Transform(newest, func(t endpoint.Endpoint) trace.EndpointInfo { return t }),
195225
xslices.Transform(added, func(t endpoint.Endpoint) trace.EndpointInfo { return t }),
@@ -198,21 +228,8 @@ func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, newest []endpoi
198228
)
199229
}()
200230

201-
connections := endpointsToConnections(b.pool, newest)
202-
for _, c := range connections {
203-
b.pool.Allow(ctx, c)
204-
c.Endpoint().Touch()
205-
}
206-
207231
info := balancerConfig.Info{SelfLocation: localDC}
208-
state := newConnectionsState(connections, b.balancerConfig.Filter, info, b.balancerConfig.AllowFallback)
209-
210-
endpointsInfo := make([]endpoint.Info, len(newest))
211-
for i, e := range newest {
212-
endpointsInfo[i] = e
213-
}
214-
215-
b.connectionsState.Store(state)
232+
b.connectionsState.Store(buildConnectionsState(ctx, b.pool, newest, dropped, b.balancerConfig, info))
216233
}
217234

218235
func (b *Balancer) Close(ctx context.Context) (err error) {
@@ -444,12 +461,3 @@ func (b *Balancer) nextConn(ctx context.Context) (c conn.Conn, err error) {
444461

445462
return c, nil
446463
}
447-
448-
func endpointsToConnections(p *conn.Pool, endpoints []endpoint.Endpoint) []conn.Conn {
449-
conns := make([]conn.Conn, 0, len(endpoints))
450-
for _, e := range endpoints {
451-
conns = append(conns, p.Get(e))
452-
}
453-
454-
return conns
455-
}

internal/balancer/balancer_test.go

+124
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package balancer
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/stretchr/testify/assert"
8+
9+
balancerConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer/config"
10+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
11+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
12+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/mock"
13+
)
14+
15+
type fakePool struct {
16+
connections map[string]*mock.Conn
17+
}
18+
19+
func (fp *fakePool) EndpointsToConnections(eps []endpoint.Endpoint) []conn.Conn {
20+
var conns []conn.Conn
21+
for _, ep := range eps {
22+
if c, ok := fp.connections[ep.Address()]; ok {
23+
conns = append(conns, c)
24+
}
25+
}
26+
27+
return conns
28+
}
29+
30+
func (fp *fakePool) Allow(_ context.Context, _ conn.Conn) {}
31+
32+
func (fp *fakePool) GetIfPresent(ep endpoint.Endpoint) conn.Conn {
33+
if c, ok := fp.connections[ep.Address()]; ok {
34+
return c
35+
}
36+
37+
return nil
38+
}
39+
40+
func TestBuildConnectionsState(t *testing.T) {
41+
ctx := context.Background()
42+
43+
tests := []struct {
44+
name string
45+
newEndpoints []endpoint.Endpoint
46+
oldEndpoints []endpoint.Endpoint
47+
initialConns map[string]*mock.Conn
48+
conf balancerConfig.Config
49+
selfLoc balancerConfig.Info
50+
expectPinged []string
51+
expectClosed []string
52+
}{
53+
{
54+
name: "single new and old endpoint",
55+
newEndpoints: []endpoint.Endpoint{&mock.Endpoint{AddrField: "127.0.0.1"}},
56+
oldEndpoints: []endpoint.Endpoint{&mock.Endpoint{AddrField: "127.0.0.2"}},
57+
initialConns: map[string]*mock.Conn{
58+
"127.0.0.1": {
59+
AddrField: "127.0.0.1",
60+
State: conn.Online,
61+
},
62+
"127.0.0.2": {
63+
AddrField: "127.0.0.2",
64+
State: conn.Offline,
65+
},
66+
},
67+
conf: balancerConfig.Config{
68+
AllowFallback: true,
69+
DetectNearestDC: true,
70+
},
71+
selfLoc: balancerConfig.Info{SelfLocation: "local"},
72+
expectPinged: []string{"127.0.0.1"},
73+
expectClosed: []string{"127.0.0.2"},
74+
},
75+
{
76+
newEndpoints: []endpoint.Endpoint{&mock.Endpoint{AddrField: "a1"}, &mock.Endpoint{AddrField: "a2"}},
77+
oldEndpoints: []endpoint.Endpoint{&mock.Endpoint{AddrField: "a3"}},
78+
initialConns: map[string]*mock.Conn{
79+
"a1": {
80+
AddrField: "a1",
81+
LocationField: "local",
82+
State: conn.Offline,
83+
},
84+
"a2": {
85+
AddrField: "a2",
86+
State: conn.Offline,
87+
},
88+
"a3": {
89+
AddrField: "a3",
90+
State: conn.Online,
91+
},
92+
},
93+
conf: balancerConfig.Config{
94+
AllowFallback: true,
95+
DetectNearestDC: true,
96+
},
97+
selfLoc: balancerConfig.Info{SelfLocation: "local"},
98+
expectPinged: []string{"a1", "a2"},
99+
expectClosed: []string{"a3"},
100+
},
101+
}
102+
103+
for _, tt := range tests {
104+
t.Run(tt.name, func(t *testing.T) {
105+
fp := &fakePool{connections: make(map[string]*mock.Conn)}
106+
for addr, c := range tt.initialConns {
107+
fp.connections[addr] = c
108+
}
109+
110+
state := buildConnectionsState(ctx, fp, tt.newEndpoints, tt.oldEndpoints, tt.conf, tt.selfLoc)
111+
assert.NotNil(t, state)
112+
for _, addr := range tt.expectPinged {
113+
c := fp.connections[addr]
114+
assert.True(t, c.Pinged.Load(), "connection %s should be pinged", addr)
115+
assert.True(t, c.State == conn.Online || c.PingErr != nil)
116+
}
117+
for _, addr := range tt.expectClosed {
118+
c := fp.connections[addr]
119+
assert.True(t, c.Closed.Load(), "connection %s should be closed", addr)
120+
assert.True(t, c.State == conn.Offline, "connection %s should be offline", addr)
121+
}
122+
})
123+
}
124+
}

internal/conn/conn.go

+2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"google.golang.org/grpc/metadata"
1414
"google.golang.org/grpc/stats"
1515

16+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/closer"
1617
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
1718
"github.com/ydb-platform/ydb-go-sdk/v3/internal/meta"
1819
"github.com/ydb-platform/ydb-go-sdk/v3/internal/operation"
@@ -36,6 +37,7 @@ var (
3637

3738
type Conn interface {
3839
grpc.ClientConnInterface
40+
closer.Closer
3941

4042
Endpoint() endpoint.Endpoint
4143

internal/conn/pool.go

+23
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,20 @@ func (p *Pool) GrpcDialOptions() []grpc.DialOption {
4040
return p.dialOptions
4141
}
4242

43+
func (p *Pool) GetIfPresent(endpoint endpoint.Endpoint) Conn {
44+
var (
45+
address = endpoint.Address()
46+
cc *conn
47+
has bool
48+
)
49+
50+
if cc, has = p.conns.Get(address); has {
51+
return cc
52+
}
53+
54+
return nil
55+
}
56+
4357
func (p *Pool) Get(endpoint endpoint.Endpoint) Conn {
4458
var (
4559
address = endpoint.Address()
@@ -252,3 +266,12 @@ func NewPool(ctx context.Context, config Config) *Pool {
252266

253267
return p
254268
}
269+
270+
func (p *Pool) EndpointsToConnections(endpoints []endpoint.Endpoint) []Conn {
271+
conns := make([]Conn, 0, len(endpoints))
272+
for _, e := range endpoints {
273+
conns = append(conns, p.Get(e))
274+
}
275+
276+
return conns
277+
}

internal/mock/conn.go

+16-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package mock
22

33
import (
44
"context"
5+
"sync/atomic"
56
"time"
67

78
"google.golang.org/grpc"
@@ -17,6 +18,8 @@ type Conn struct {
1718
NodeIDField uint32
1819
State conn.State
1920
LocalDCField bool
21+
Pinged atomic.Bool
22+
Closed atomic.Bool
2023
}
2124

2225
func (c *Conn) Invoke(
@@ -53,7 +56,19 @@ func (c *Conn) Park(ctx context.Context) (err error) {
5356
panic("not implemented in mock")
5457
}
5558

59+
func (c *Conn) Close(ctx context.Context) error {
60+
c.Closed.Store(true)
61+
c.SetState(ctx, conn.Offline)
62+
63+
return nil
64+
}
65+
5666
func (c *Conn) Ping(ctx context.Context) error {
67+
c.Pinged.Store(true)
68+
if c.PingErr == nil {
69+
c.SetState(ctx, conn.Online)
70+
}
71+
5772
return c.PingErr
5873
}
5974

@@ -116,7 +131,7 @@ func (e *Endpoint) LoadFactor() float32 {
116131
}
117132

118133
func (e *Endpoint) OverrideHost() string {
119-
panic("not implemented in mock")
134+
return ""
120135
}
121136

122137
func (e *Endpoint) String() string {

0 commit comments

Comments
 (0)