Skip to content

Commit 6d43f34

Browse files
committed
split into multiple files
1 parent 2d90b47 commit 6d43f34

4 files changed

+293
-276
lines changed

epsilon_greedy.go

+197
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
package hostpool
2+
3+
import (
4+
"log"
5+
"math/rand"
6+
"time"
7+
)
8+
9+
type epsilonHostPoolResponse struct {
10+
standardHostPoolResponse
11+
started time.Time
12+
ended time.Time
13+
}
14+
15+
func (r *epsilonHostPoolResponse) Mark(err error) {
16+
r.Do(func() {
17+
r.ended = time.Now()
18+
doMark(err, r)
19+
})
20+
21+
}
22+
23+
type epsilonGreedyHostPool struct {
24+
standardHostPool // TODO - would be nifty if we could embed HostPool and Locker interfaces
25+
epsilon float32 // this is our exploration factor
26+
decayDuration time.Duration
27+
EpsilonValueCalculator // embed the epsilonValueCalculator
28+
timer
29+
}
30+
31+
// Epsilon Greedy is an algorithim that allows HostPool not only to track failure state,
32+
// but also to learn about "better" options in terms of speed, and to pick from available hosts
33+
// based on a percentage of how well they perform. This gives a weighted request rate to better
34+
// performing hosts, while still distributing requests to all hosts (proportionate to their performance)
35+
//
36+
// After enabling Epsilon Greedy, hosts must be marked for sucess along with a time value representing
37+
// how fast (or slow) that host was.
38+
//
39+
// host := pool.Get()
40+
// start := time.Now()
41+
// ..... do work with host
42+
// duration = time.Now().Sub(start)
43+
// pool.MarkSuccessWithTime(host, duration)
44+
//
45+
// a good overview of Epsilon Greedy is here http://stevehanov.ca/blog/index.php?id=132
46+
//
47+
// decayDuration may be set to 0 to use the default value of 5 minutes
48+
func NewEpsilonGreedy(hosts []string, decayDuration time.Duration, calc EpsilonValueCalculator) HostPool {
49+
50+
if decayDuration <= 0 {
51+
decayDuration = defaultDecayDuration
52+
}
53+
stdHP := New(hosts).(*standardHostPool)
54+
p := &epsilonGreedyHostPool{
55+
standardHostPool: *stdHP,
56+
epsilon: float32(initialEpsilon),
57+
decayDuration: decayDuration,
58+
EpsilonValueCalculator: calc,
59+
timer: &realTimer{},
60+
}
61+
62+
// allocate structures
63+
for _, h := range p.hostList {
64+
h.epsilonCounts = make([]int64, epsilonBuckets)
65+
h.epsilonValues = make([]int64, epsilonBuckets)
66+
}
67+
go p.epsilonGreedyDecay()
68+
return p
69+
}
70+
71+
func (p *epsilonGreedyHostPool) SetEpsilon(newEpsilon float32) {
72+
p.Lock()
73+
defer p.Unlock()
74+
p.epsilon = newEpsilon
75+
}
76+
77+
func (p *epsilonGreedyHostPool) epsilonGreedyDecay() {
78+
durationPerBucket := p.decayDuration / epsilonBuckets
79+
ticker := time.Tick(durationPerBucket)
80+
for {
81+
<-ticker
82+
p.performEpsilonGreedyDecay()
83+
}
84+
}
85+
func (p *epsilonGreedyHostPool) performEpsilonGreedyDecay() {
86+
p.Lock()
87+
for _, h := range p.hostList {
88+
h.epsilonIndex += 1
89+
h.epsilonIndex = h.epsilonIndex % epsilonBuckets
90+
h.epsilonCounts[h.epsilonIndex] = 0
91+
h.epsilonValues[h.epsilonIndex] = 0
92+
}
93+
p.Unlock()
94+
}
95+
96+
func (p *epsilonGreedyHostPool) Get() HostPoolResponse {
97+
p.Lock()
98+
defer p.Unlock()
99+
host := p.getEpsilonGreedy()
100+
started := time.Now()
101+
return &epsilonHostPoolResponse{
102+
standardHostPoolResponse: standardHostPoolResponse{host: host, pool: p},
103+
started: started,
104+
}
105+
}
106+
107+
func (p *epsilonGreedyHostPool) getEpsilonGreedy() string {
108+
var hostToUse *hostEntry
109+
110+
// this is our exploration phase
111+
if rand.Float32() < p.epsilon {
112+
p.epsilon = p.epsilon * epsilonDecay
113+
if p.epsilon < minEpsilon {
114+
p.epsilon = minEpsilon
115+
}
116+
return p.getRoundRobin()
117+
}
118+
119+
// calculate values for each host in the 0..1 range (but not ormalized)
120+
var possibleHosts []*hostEntry
121+
now := time.Now()
122+
var sumValues float64
123+
for _, h := range p.hostList {
124+
if h.canTryHost(now) {
125+
v := h.getWeightedAverageResponseTime()
126+
if v > 0 {
127+
ev := p.CalcValueFromAvgResponseTime(v)
128+
h.epsilonValue = ev
129+
sumValues += ev
130+
possibleHosts = append(possibleHosts, h)
131+
}
132+
}
133+
}
134+
135+
if len(possibleHosts) != 0 {
136+
// now normalize to the 0..1 range to get a percentage
137+
for _, h := range possibleHosts {
138+
h.epsilonPercentage = h.epsilonValue / sumValues
139+
}
140+
141+
// do a weighted random choice among hosts
142+
ceiling := 0.0
143+
pickPercentage := rand.Float64()
144+
for _, h := range possibleHosts {
145+
ceiling += h.epsilonPercentage
146+
if pickPercentage <= ceiling {
147+
hostToUse = h
148+
break
149+
}
150+
}
151+
}
152+
153+
if hostToUse == nil {
154+
if len(possibleHosts) != 0 {
155+
log.Println("Failed to randomly choose a host, Dan loses")
156+
}
157+
return p.getRoundRobin()
158+
}
159+
160+
if hostToUse.dead {
161+
hostToUse.willRetryHost(p.maxRetryInterval)
162+
}
163+
return hostToUse.host
164+
}
165+
166+
func (p *epsilonGreedyHostPool) markSuccess(hostR HostPoolResponse) {
167+
// first do the base markSuccess - a little redundant with host lookup but cleaner than repeating logic
168+
p.standardHostPool.markSuccess(hostR)
169+
eHostR, ok := hostR.(*epsilonHostPoolResponse)
170+
if !ok {
171+
log.Printf("Incorrect type in eps markSuccess!") // TODO reflection to print out offending type
172+
return
173+
}
174+
host := eHostR.host
175+
duration := p.between(eHostR.started, eHostR.ended)
176+
177+
p.Lock()
178+
defer p.Unlock()
179+
h, ok := p.hosts[host]
180+
if !ok {
181+
log.Fatalf("host %s not in HostPool %v", host, p.Hosts())
182+
}
183+
h.epsilonCounts[h.epsilonIndex]++
184+
h.epsilonValues[h.epsilonIndex] += int64(duration.Seconds() * 1000)
185+
}
186+
187+
// --- timer: this just exists for testing
188+
189+
type timer interface {
190+
between(time.Time, time.Time) time.Duration
191+
}
192+
193+
type realTimer struct{}
194+
195+
func (rt *realTimer) between(start time.Time, end time.Time) time.Duration {
196+
return end.Sub(start)
197+
}

epsilon_value_calculators.go

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package hostpool
2+
3+
// --- Value Calculators -----------------
4+
5+
import (
6+
"math"
7+
)
8+
9+
// --- Definitions -----------------------
10+
11+
type EpsilonValueCalculator interface {
12+
CalcValueFromAvgResponseTime(float64) float64
13+
}
14+
15+
type LinearEpsilonValueCalculator struct{}
16+
type LogEpsilonValueCalculator struct{ LinearEpsilonValueCalculator }
17+
type PolynomialEpsilonValueCalculator struct {
18+
LinearEpsilonValueCalculator
19+
Exp float64 // the exponent to which we will raise the value to reweight
20+
}
21+
22+
// -------- Methods -----------------------
23+
24+
func (c *LinearEpsilonValueCalculator) CalcValueFromAvgResponseTime(v float64) float64 {
25+
return 1.0 / v
26+
}
27+
28+
func (c *LogEpsilonValueCalculator) CalcValueFromAvgResponseTime(v float64) float64 {
29+
return math.Log(c.LinearEpsilonValueCalculator.CalcValueFromAvgResponseTime(v))
30+
}
31+
32+
func (c *PolynomialEpsilonValueCalculator) CalcValueFromAvgResponseTime(v float64) float64 {
33+
return math.Pow(c.LinearEpsilonValueCalculator.CalcValueFromAvgResponseTime(v), c.Exp)
34+
}

host_entry.go

+62
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package hostpool
2+
3+
import (
4+
"time"
5+
)
6+
7+
// --- hostEntry - this is due to get upgraded
8+
9+
type hostEntry struct {
10+
host string
11+
nextRetry time.Time
12+
retryCount int16
13+
retryDelay time.Duration
14+
dead bool
15+
epsilonCounts []int64
16+
epsilonValues []int64
17+
epsilonIndex int
18+
epsilonValue float64
19+
epsilonPercentage float64
20+
}
21+
22+
func (h *hostEntry) canTryHost(now time.Time) bool {
23+
if !h.dead {
24+
return true
25+
}
26+
if h.nextRetry.Before(now) {
27+
return true
28+
}
29+
return false
30+
}
31+
32+
func (h *hostEntry) willRetryHost(maxRetryInterval time.Duration) {
33+
h.retryCount += 1
34+
newDelay := h.retryDelay * 2
35+
if newDelay < maxRetryInterval {
36+
h.retryDelay = newDelay
37+
} else {
38+
h.retryDelay = maxRetryInterval
39+
}
40+
h.nextRetry = time.Now().Add(h.retryDelay)
41+
}
42+
43+
func (h *hostEntry) getWeightedAverageResponseTime() float64 {
44+
var value float64
45+
var lastValue float64
46+
47+
// start at 1 so we start with the oldest entry
48+
for i := 1; i <= epsilonBuckets; i += 1 {
49+
pos := (h.epsilonIndex + i) % epsilonBuckets
50+
bucketCount := h.epsilonCounts[pos]
51+
// Changing the line below to what I think it should be to get the weights right
52+
weight := float64(i) / float64(epsilonBuckets)
53+
if bucketCount > 0 {
54+
currentValue := float64(h.epsilonValues[pos]) / float64(bucketCount)
55+
value += currentValue * weight
56+
lastValue = currentValue
57+
} else {
58+
value += lastValue * weight
59+
}
60+
}
61+
return value
62+
}

0 commit comments

Comments
 (0)