Skip to content

Commit

Permalink
feat(lb): PeakEWMA load balancer
Browse files Browse the repository at this point in the history
Latency-based load balancing algorithm supporting fault isolation
  • Loading branch information
jizhuozhi committed Mar 3, 2024
1 parent e6ce5f7 commit 8106ba2
Showing 1 changed file with 16 additions and 12 deletions.
28 changes: 16 additions & 12 deletions pkg/loadbalance/peak_ewma_load_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ import (
"golang.org/x/sync/singleflight"
)

var alpha = 1 - math.Exp(-5.0/60.0/1) // for 1-minute moving average

type ewmaInstance struct {
discovery.Instance
mu sync.Mutex
alpha float64
activeReqs int64
lastTick time.Time
latency float64
Expand All @@ -53,9 +54,9 @@ func (e *ewmaInstance) Update(_ context.Context, duration time.Duration, value i
e.mu.Lock()
defer e.mu.Unlock()
e.lastTick = now
e.latency = ewma(e.alpha, e.latency, float64(duration.Milliseconds()), d)
e.errs = ewma(e.alpha, e.errs, float64(err), d)
e.total = ewma(e.alpha, e.total, float64(1), d)
e.latency = ewma(alpha, e.latency, float64(duration.Milliseconds()), d)
e.errs = ewma(alpha, e.errs, float64(err), d)
e.total = ewma(alpha, e.total, float64(1), d)
}

const defaultEWMALatency = 1000
Expand All @@ -69,12 +70,13 @@ func (e *ewmaInstance) score() float64 {
total := e.total
e.mu.Unlock()

weight := float64(e.Weight())
if latency == 0 {
latency = defaultEWMALatency
}
successRate := 1 - errs/(total+1)

return activeReqs * latency / successRate
return activeReqs * latency / successRate / weight
}

func ewma(alpha, v, i float64, d float64) float64 {
Expand All @@ -91,24 +93,26 @@ func (e *PeakEWMAPicker) Next(_ context.Context, _ interface{}) discovery.Instan
}
a := e.instances[fastrand.Int()%len(e.instances)]
b := e.instances[fastrand.Int()%len(e.instances)]

var selected *ewmaInstance
if a.score() < b.score() {
return a
selected = a
} else {
selected = b
}
return b
atomic.AddInt64(&selected.activeReqs, 1)

return selected
}

func newPeakEWMAPicker(instances []discovery.Instance) Picker {
newInstances := make([]*ewmaInstance, len(instances))
for i, ins := range instances {
newInstances[i] = &ewmaInstance{Instance: ins, alpha: weightedAlpha(float64(ins.Weight()))}
newInstances[i] = &ewmaInstance{Instance: ins}
}
return &PeakEWMAPicker{instances: newInstances}
}

func weightedAlpha(w float64) float64 {
return 1 - math.Exp(-5.0/60.0/w) // for a W minute moving average.
}

type peakEWMABalancer struct {
pickerCache sync.Map
sfg singleflight.Group
Expand Down

0 comments on commit 8106ba2

Please sign in to comment.