4
4
"context"
5
5
"fmt"
6
6
"strings"
7
+ "sync"
7
8
"sync/atomic"
8
9
9
10
"github.com/ydb-platform/ydb-go-genproto/Ydb_Discovery_V1"
@@ -127,10 +128,12 @@ func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, newest []endpoi
127
128
)
128
129
previous = b .connections ().All ()
129
130
)
131
+
132
+ _ , added , dropped := xslices .Diff (previous , newest , func (lhs , rhs endpoint.Endpoint ) int {
133
+ return strings .Compare (lhs .Address (), rhs .Address ())
134
+ })
135
+
130
136
defer func () {
131
- _ , added , dropped := xslices .Diff (previous , newest , func (lhs , rhs endpoint.Endpoint ) int {
132
- return strings .Compare (lhs .Address (), rhs .Address ())
133
- })
134
137
onDone (
135
138
xslices .Transform (newest , func (t endpoint.Endpoint ) trace.EndpointInfo { return t }),
136
139
xslices .Transform (added , func (t endpoint.Endpoint ) trace.EndpointInfo { return t }),
@@ -145,6 +148,14 @@ func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, newest []endpoi
145
148
c .Endpoint ().Touch ()
146
149
}
147
150
151
+ droppedConnections := endpointsToConnections (b .pool , dropped )
152
+ for _ , c := range droppedConnections {
153
+ sync .OnceFunc (func () {
154
+ b .pool .Ban (ctx , c , errEndpointNotDiscovered )
155
+ _ = c .Close (ctx )
156
+ })()
157
+ }
158
+
148
159
info := balancerConfig.Info {SelfLocation : localDC }
149
160
state := newConnectionsState (connections , b .balancerConfig .Filter , info , b .balancerConfig .AllowFallback )
150
161
0 commit comments