Skip to content

Commit 4914d8a

Browse files
committed
close dropped connections and ping added connections on balancer.Update()
1 parent 4b3e883 commit 4914d8a

File tree

5 files changed

+36
-4
lines changed

5 files changed

+36
-4
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Ping new connections on discovery attempt, close dropped ones, so `ydb_go_sdk_ydb_driver_conns` metric is correct
2+
13
## v3.108.1
24
* Supported `json.Marshaller` query parameter in `database/sql` driver
35

internal/balancer/balancer.go

+13-3
Original file line numberDiff line numberDiff line change
@@ -186,10 +186,12 @@ func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, newest []endpoi
186186
)
187187
previous = b.connections().All()
188188
)
189+
190+
_, added, dropped := xslices.Diff(previous, newest, func(lhs, rhs endpoint.Endpoint) int {
191+
return strings.Compare(lhs.Address(), rhs.Address())
192+
})
193+
189194
defer func() {
190-
_, added, dropped := xslices.Diff(previous, newest, func(lhs, rhs endpoint.Endpoint) int {
191-
return strings.Compare(lhs.Address(), rhs.Address())
192-
})
193195
onDone(
194196
xslices.Transform(newest, func(t endpoint.Endpoint) trace.EndpointInfo { return t }),
195197
xslices.Transform(added, func(t endpoint.Endpoint) trace.EndpointInfo { return t }),
@@ -202,6 +204,7 @@ func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, newest []endpoi
202204
for _, c := range connections {
203205
b.pool.Allow(ctx, c)
204206
c.Endpoint().Touch()
207+
_ = c.Ping(ctx)
205208
}
206209

207210
info := balancerConfig.Info{SelfLocation: localDC}
@@ -213,6 +216,13 @@ func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, newest []endpoi
213216
}
214217

215218
b.connectionsState.Store(state)
219+
220+
for _, e := range dropped {
221+
c := b.pool.GetIfPresent(e)
222+
if c != nil {
223+
_ = c.Close(ctx)
224+
}
225+
}
216226
}
217227

218228
func (b *Balancer) Close(ctx context.Context) (err error) {

internal/conn/conn.go

+2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"google.golang.org/grpc/metadata"
1414
"google.golang.org/grpc/stats"
1515

16+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/closer"
1617
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
1718
"github.com/ydb-platform/ydb-go-sdk/v3/internal/meta"
1819
"github.com/ydb-platform/ydb-go-sdk/v3/internal/operation"
@@ -36,6 +37,7 @@ var (
3637

3738
type Conn interface {
3839
grpc.ClientConnInterface
40+
closer.Closer
3941

4042
Endpoint() endpoint.Endpoint
4143

internal/conn/pool.go

+14
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,20 @@ func (p *Pool) GrpcDialOptions() []grpc.DialOption {
4040
return p.dialOptions
4141
}
4242

43+
func (p *Pool) GetIfPresent(endpoint endpoint.Endpoint) Conn {
44+
var (
45+
address = endpoint.Address()
46+
cc *conn
47+
has bool
48+
)
49+
50+
if cc, has = p.conns.Get(address); has {
51+
return cc
52+
}
53+
54+
return nil
55+
}
56+
4357
func (p *Pool) Get(endpoint endpoint.Endpoint) Conn {
4458
var (
4559
address = endpoint.Address()

internal/mock/conn.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ func (c *Conn) Park(ctx context.Context) (err error) {
5353
panic("not implemented in mock")
5454
}
5555

56+
func (c *Conn) Close(_ context.Context) error {
57+
return nil
58+
}
59+
5660
func (c *Conn) Ping(ctx context.Context) error {
5761
return c.PingErr
5862
}
@@ -116,7 +120,7 @@ func (e *Endpoint) LoadFactor() float32 {
116120
}
117121

118122
func (e *Endpoint) OverrideHost() string {
119-
panic("not implemented in mock")
123+
return ""
120124
}
121125

122126
func (e *Endpoint) String() string {

0 commit comments

Comments
 (0)