-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathloadbalancer.go
98 lines (89 loc) · 2.18 KB
/
loadbalancer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package goloadbalancer
import (
"context"
"errors"
"sync"
"sync/atomic"
)
type BalanceType string
const (
// RoundRobin 轮询
RRBalanceType BalanceType = "RR"
// WeightedRoundRobin 加权轮询
WRRBalanceType BalanceType = "WRR"
// ConsistentHash
ConsistentHashBalanceType BalanceType = "CH"
// LeastConnection 最小连接数
LCBalanceType BalanceType = "LC"
// ShortestExpectedDelay 最短期望延迟
SEDBalanceType BalanceType = "SED"
// WeightedLeastConnection
WLCBalanceType BalanceType = "WLC"
// NeverQueuejum
NQBalanceType BalanceType = "NQ"
)
type EndpointMeta interface {
Addr() string
Weight() int
}
type Endpoint interface {
Get(context.Context) (interface{}, error)
Addr() string
Close() error
Stats() Stats
AfterTransform(ctx context.Context, cn Connection)
}
type LoadBalance interface {
Select(args ...interface{}) (Endpoint, error)
AddEndpoint(endpoint interface{})
RemoveEndpoint(endpoint Endpoint)
Name() string
Close() error
GetEndpoints() []Endpoint
}
type BaseLoadBalance struct {
endpoints []Endpoint
lock sync.RWMutex
closed uint32
}
func (b *BaseLoadBalance) GetEndpoints() []Endpoint {
return b.endpoints
}
func (b *BaseLoadBalance) AddEndpoint(endpoint interface{}) {
b.lock.Lock()
defer b.lock.Unlock()
b.endpoints = append(b.endpoints, endpoint.(Endpoint))
}
func (b *BaseLoadBalance) RemoveEndpoint(endpoint Endpoint) {
b.lock.Lock()
defer b.lock.Unlock()
for i, ep := range b.endpoints {
if ep == endpoint {
b.endpoints = append(b.endpoints[:i], b.endpoints[i+1:]...)
return
}
}
}
func (b *BaseLoadBalance) Close() error {
if !atomic.CompareAndSwapUint32(&b.closed, 0, 1) {
// 如果已经是关闭状态,则直接返回,表示无需再次关闭
return nil
}
for _, endpoint := range b.endpoints {
if err := endpoint.Close(); err != nil {
return err
}
}
return nil
}
var (
ErrNoEndpoint = errors.New("no endpoint available")
ErrNoSourceIP = errors.New("no source ip")
)
func CheckBalanceType(balance string) bool {
switch BalanceType(balance) {
case RRBalanceType, WRRBalanceType, ConsistentHashBalanceType, LCBalanceType, SEDBalanceType, WLCBalanceType, NQBalanceType:
return true
}
return false
}