Skip to content

Commit 79c9c83

Browse files
committed
WIP
1 parent abe3bf3 commit 79c9c83

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

72 files changed

+35188
-1455
lines changed

balancers/balancers.go

+25-94
Original file line numberDiff line numberDiff line change
@@ -1,154 +1,85 @@
11
package balancers
22

33
import (
4-
"sort"
5-
"strings"
6-
74
balancerConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer/config"
85
"github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
9-
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xstring"
106
)
117

128
// Deprecated: RoundRobin is RandomChoice now
139
func RoundRobin() *balancerConfig.Config {
14-
return &balancerConfig.Config{}
10+
return balancerConfig.New()
1511
}
1612

1713
func RandomChoice() *balancerConfig.Config {
18-
return &balancerConfig.Config{}
14+
return balancerConfig.New()
1915
}
2016

2117
func SingleConn() *balancerConfig.Config {
22-
return &balancerConfig.Config{
23-
SingleConn: true,
24-
}
25-
}
26-
27-
type filterLocalDC struct{}
28-
29-
func (filterLocalDC) Allow(info balancerConfig.Info, c conn.Info) bool {
30-
return c.Endpoint().Location() == info.SelfLocation
31-
}
32-
33-
func (filterLocalDC) String() string {
34-
return "LocalDC"
18+
return balancerConfig.New(balancerConfig.UseSingleConn())
3519
}
3620

3721
// PreferLocalDC creates balancer which use endpoints only in location such as initial endpoint location
3822
// Balancer "balancer" defines balancing algorithm between endpoints selected with filter by location
3923
// PreferLocalDC balancer try to autodetect local DC from client side.
4024
func PreferLocalDC(balancer *balancerConfig.Config) *balancerConfig.Config {
41-
balancer.Filter = filterLocalDC{}
42-
balancer.DetectLocalDC = true
43-
44-
return balancer
25+
return balancer.With(
26+
balancerConfig.FilterLocalDC(),
27+
balancerConfig.DetectLocalDC(),
28+
)
4529
}
4630

4731
// PreferLocalDCWithFallBack creates balancer which use endpoints only in location such as initial endpoint location
4832
// Balancer "balancer" defines balancing algorithm between endpoints selected with filter by location
4933
// If filter returned zero endpoints from all discovery endpoints list - used all endpoint instead
5034
func PreferLocalDCWithFallBack(balancer *balancerConfig.Config) *balancerConfig.Config {
51-
balancer = PreferLocalDC(balancer)
52-
balancer.AllowFallback = true
53-
54-
return balancer
55-
}
56-
57-
type filterLocations []string
58-
59-
func (locations filterLocations) Allow(_ balancerConfig.Info, c conn.Info) bool {
60-
location := strings.ToUpper(c.Endpoint().Location())
61-
for _, l := range locations {
62-
if location == l {
63-
return true
64-
}
65-
}
66-
67-
return false
68-
}
69-
70-
func (locations filterLocations) String() string {
71-
buffer := xstring.Buffer()
72-
defer buffer.Free()
73-
74-
buffer.WriteString("Locations{")
75-
for i, l := range locations {
76-
if i != 0 {
77-
buffer.WriteByte(',')
78-
}
79-
buffer.WriteString(l)
80-
}
81-
buffer.WriteByte('}')
82-
83-
return buffer.String()
35+
return PreferLocalDC(balancer).With(balancerConfig.AllowFallback())
8436
}
8537

8638
// PreferLocations creates balancer which use endpoints only in selected locations (such as "ABC", "DEF", etc.)
8739
// Balancer "balancer" defines balancing algorithm between endpoints selected with filter by location
8840
func PreferLocations(balancer *balancerConfig.Config, locations ...string) *balancerConfig.Config {
89-
if len(locations) == 0 {
90-
panic("empty list of locations")
91-
}
92-
for i := range locations {
93-
locations[i] = strings.ToUpper(locations[i])
94-
}
95-
sort.Strings(locations)
96-
balancer.Filter = filterLocations(locations)
97-
98-
return balancer
41+
return balancer.With(balancerConfig.FilterLocations(locations...))
9942
}
10043

10144
// PreferLocationsWithFallback creates balancer which use endpoints only in selected locations
10245
// Balancer "balancer" defines balancing algorithm between endpoints selected with filter by location
10346
// If filter returned zero endpoints from all discovery endpoints list - used all endpoint instead
10447
func PreferLocationsWithFallback(balancer *balancerConfig.Config, locations ...string) *balancerConfig.Config {
105-
balancer = PreferLocations(balancer, locations...)
106-
balancer.AllowFallback = true
107-
108-
return balancer
48+
return balancer.With(
49+
balancerConfig.FilterLocations(locations...),
50+
balancerConfig.AllowFallback(),
51+
)
10952
}
11053

11154
type Endpoint interface {
11255
NodeID() uint32
11356
Address() string
11457
Location() string
115-
116-
// Deprecated: LocalDC check "local" by compare endpoint location with discovery "selflocation" field.
117-
// It work good only if connection url always point to local dc.
118-
LocalDC() bool
119-
}
120-
121-
type filterFunc func(info balancerConfig.Info, c conn.Info) bool
122-
123-
func (p filterFunc) Allow(info balancerConfig.Info, c conn.Info) bool {
124-
return p(info, c)
125-
}
126-
127-
func (p filterFunc) String() string {
128-
return "Custom"
12958
}
13059

13160
// Prefer creates balancer which use endpoints by filter
13261
// Balancer "balancer" defines balancing algorithm between endpoints selected with filter
13362
func Prefer(balancer *balancerConfig.Config, filter func(endpoint Endpoint) bool) *balancerConfig.Config {
134-
balancer.Filter = filterFunc(func(_ balancerConfig.Info, c conn.Info) bool {
135-
return filter(c.Endpoint())
136-
})
137-
138-
return balancer
63+
return balancer.With(
64+
balancerConfig.FilterFunc(func(_ balancerConfig.Info, c conn.Info) bool {
65+
return filter(c.Endpoint())
66+
}),
67+
)
13968
}
14069

14170
// PreferWithFallback creates balancer which use endpoints by filter
14271
// Balancer "balancer" defines balancing algorithm between endpoints selected with filter
14372
// If filter returned zero endpoints from all discovery endpoints list - used all endpoint instead
14473
func PreferWithFallback(balancer *balancerConfig.Config, filter func(endpoint Endpoint) bool) *balancerConfig.Config {
145-
balancer = Prefer(balancer, filter)
146-
balancer.AllowFallback = true
147-
148-
return balancer
74+
return balancer.With(
75+
balancerConfig.FilterFunc(func(_ balancerConfig.Info, c conn.Info) bool {
76+
return filter(c.Endpoint())
77+
}),
78+
balancerConfig.AllowFallback(),
79+
)
14980
}
15081

15182
// Default balancer used by default
15283
func Default() *balancerConfig.Config {
153-
return RandomChoice()
84+
return balancerConfig.New()
15485
}

balancers/balancers_test.go

+18-20
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"testing"
55

66
"github.com/stretchr/testify/require"
7+
"google.golang.org/grpc/connectivity"
78

89
balancerConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer/config"
910
"github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
@@ -12,57 +13,54 @@ import (
1213

1314
func TestPreferLocalDC(t *testing.T) {
1415
conns := []conn.Info{
15-
&mock.ConnInfo{EndpointAddrField: "1", EndpointLocationField: "1"},
16-
&mock.ConnInfo{EndpointAddrField: "2", ConnState: conn.Online, EndpointLocationField: "2"},
17-
&mock.ConnInfo{EndpointAddrField: "3", ConnState: conn.Online, EndpointLocationField: "2"},
16+
&mock.Conn{EndpointField: &mock.Endpoint{AddressField: "1", LocationField: "1"}},
17+
&mock.Conn{EndpointField: &mock.Endpoint{AddressField: "2", LocationField: "2"}, StateField: connectivity.Ready},
18+
&mock.Conn{EndpointField: &mock.Endpoint{AddressField: "3", LocationField: "2"}, StateField: connectivity.Ready},
1819
}
1920
rr := PreferLocalDC(RandomChoice())
20-
require.False(t, rr.AllowFallback)
21+
require.False(t, rr.AllowFallback())
2122
require.Equal(t, []conn.Info{conns[1], conns[2]}, applyPreferFilter(balancerConfig.Info{SelfLocation: "2"}, rr, conns))
2223
}
2324

2425
func TestPreferLocalDCWithFallBack(t *testing.T) {
2526
conns := []conn.Info{
26-
&mock.ConnInfo{EndpointAddrField: "1", EndpointLocationField: "1"},
27-
&mock.ConnInfo{EndpointAddrField: "2", ConnState: conn.Online, EndpointLocationField: "2"},
28-
&mock.ConnInfo{EndpointAddrField: "3", ConnState: conn.Online, EndpointLocationField: "2"},
27+
&mock.Conn{EndpointField: &mock.Endpoint{AddressField: "1", LocationField: "1"}},
28+
&mock.Conn{EndpointField: &mock.Endpoint{AddressField: "2", LocationField: "2"}, StateField: connectivity.Ready},
29+
&mock.Conn{EndpointField: &mock.Endpoint{AddressField: "3", LocationField: "2"}, StateField: connectivity.Ready},
2930
}
3031
rr := PreferLocalDCWithFallBack(RandomChoice())
31-
require.True(t, rr.AllowFallback)
32+
require.True(t, rr.AllowFallback())
3233
require.Equal(t, []conn.Info{conns[1], conns[2]}, applyPreferFilter(balancerConfig.Info{SelfLocation: "2"}, rr, conns))
3334
}
3435

3536
func TestPreferLocations(t *testing.T) {
3637
conns := []conn.Info{
37-
&mock.ConnInfo{EndpointAddrField: "1", EndpointLocationField: "zero", ConnState: conn.Online},
38-
&mock.ConnInfo{EndpointAddrField: "2", ConnState: conn.Online, EndpointLocationField: "one"},
39-
&mock.ConnInfo{EndpointAddrField: "3", ConnState: conn.Online, EndpointLocationField: "two"},
38+
&mock.Conn{EndpointField: &mock.Endpoint{AddressField: "1", LocationField: "zero"}, StateField: connectivity.Ready},
39+
&mock.Conn{EndpointField: &mock.Endpoint{AddressField: "2", LocationField: "one"}, StateField: connectivity.Ready},
40+
&mock.Conn{EndpointField: &mock.Endpoint{AddressField: "3", LocationField: "two"}, StateField: connectivity.Ready},
4041
}
4142

4243
rr := PreferLocations(RandomChoice(), "zero", "two")
43-
require.False(t, rr.AllowFallback)
44+
require.False(t, rr.AllowFallback())
4445
require.Equal(t, []conn.Info{conns[0], conns[2]}, applyPreferFilter(balancerConfig.Info{}, rr, conns))
4546
}
4647

4748
func TestPreferLocationsWithFallback(t *testing.T) {
4849
conns := []conn.Info{
49-
&mock.ConnInfo{EndpointAddrField: "1", EndpointLocationField: "zero", ConnState: conn.Online},
50-
&mock.ConnInfo{EndpointAddrField: "2", ConnState: conn.Online, EndpointLocationField: "one"},
51-
&mock.ConnInfo{EndpointAddrField: "3", ConnState: conn.Online, EndpointLocationField: "two"},
50+
&mock.Conn{EndpointField: &mock.Endpoint{AddressField: "1", LocationField: "zero"}, StateField: connectivity.Ready},
51+
&mock.Conn{EndpointField: &mock.Endpoint{AddressField: "2", LocationField: "one"}, StateField: connectivity.Ready},
52+
&mock.Conn{EndpointField: &mock.Endpoint{AddressField: "3", LocationField: "two"}, StateField: connectivity.Ready},
5253
}
5354

5455
rr := PreferLocationsWithFallback(RandomChoice(), "zero", "two")
55-
require.True(t, rr.AllowFallback)
56+
require.True(t, rr.AllowFallback())
5657
require.Equal(t, []conn.Info{conns[0], conns[2]}, applyPreferFilter(balancerConfig.Info{}, rr, conns))
5758
}
5859

5960
func applyPreferFilter(info balancerConfig.Info, b *balancerConfig.Config, conns []conn.Info) []conn.Info {
60-
if b.Filter == nil {
61-
b.Filter = filterFunc(func(info balancerConfig.Info, c conn.Info) bool { return true })
62-
}
6361
res := make([]conn.Info, 0, len(conns))
6462
for _, c := range conns {
65-
if b.Filter.Allow(info, c) {
63+
if b.Filter().Allow(info, c) {
6664
res = append(res, c)
6765
}
6866
}

0 commit comments

Comments
 (0)