From 84e84de2f6559f711da3505859053f77cb75699a Mon Sep 17 00:00:00 2001 From: Elias Carter Date: Tue, 23 Sep 2025 22:33:01 +0000 Subject: [PATCH] clientv3: backoff resetting LeaseKeepAlive stream A large number of client leases can cause cascading failures within the etcd cluster. Currently, when the keepalive stream has an error we will always wait 500ms and then try to recreate the stream with LeaseKeepAlive(). Since there is no backoff or jitter, if the lease streams originally broke due to overload on the servers the retries can cause a cascading failure and put more load on the servers. We can backoff and jitter -- similar to what is done in watch streams -- in order to alleviate server load in the case where leases are causing the overload. Signed-off-by: Elias Carter --- client/v3/lease.go | 17 +++++++++++++---- client/v3/utils.go | 9 +++++++++ 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/client/v3/lease.go b/client/v3/lease.go index 11b583482863..cee3389f154d 100644 --- a/client/v3/lease.go +++ b/client/v3/lease.go @@ -82,8 +82,12 @@ const ( // NoLease is a lease ID for the absence of a lease. NoLease LeaseID = 0 - // retryConnWait is how long to wait before retrying request due to an error - retryConnWait = 500 * time.Millisecond + // retryConnMinBackoff is the starting backoff when retrying a request due to an error + retryConnMinBackoff = 500 * time.Millisecond + // retryConnMaxBackoff is the max backoff when retrying a request due to an error + retryConnMaxBackoff = 15 * time.Second + // sendKeepaliveFrequency is how often to send keepalives + sendKeepaliveFrequency = 500 * time.Millisecond ) // LeaseResponseChSize is the size of buffer to store unsent lease responses. @@ -458,9 +462,11 @@ func (l *lessor) recvKeepAliveLoop() (gerr error) { l.mu.Unlock() }() + backoffGeneration := 0 for { stream, err := l.resetRecv() if err != nil { + backoffGeneration++ l.lg.Warn("error occurred during lease keep alive loop", zap.Error(err), ) @@ -468,6 +474,7 @@ func (l *lessor) recvKeepAliveLoop() (gerr error) { return err } } else { + backoffGeneration = 0 for { resp, err := stream.Recv() if err != nil { @@ -485,8 +492,10 @@ func (l *lessor) recvKeepAliveLoop() (gerr error) { } } + backoff := jitterUp(expBackoff(backoffGeneration, retryConnMinBackoff, retryConnMaxBackoff), 0.5) + select { - case <-time.After(retryConnWait): + case <-time.After(backoff): case <-l.stopCtx.Done(): return l.stopCtx.Err() } @@ -607,7 +616,7 @@ func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) { } select { - case <-time.After(retryConnWait): + case <-time.After(sendKeepaliveFrequency): case <-stream.Context().Done(): return case <-l.donec: diff --git a/client/v3/utils.go b/client/v3/utils.go index 850275877d32..6a8da12e1447 100644 --- a/client/v3/utils.go +++ b/client/v3/utils.go @@ -15,6 +15,7 @@ package clientv3 import ( + "math" "math/rand" "time" ) @@ -29,3 +30,11 @@ func jitterUp(duration time.Duration, jitter float64) time.Duration { multiplier := jitter * (rand.Float64()*2 - 1) return time.Duration(float64(duration) * (1 + multiplier)) } + +// expBackoff returns an exponential backoff duration. +// +// This will double the duration each generation and clamp between [minDelay, maxDelay] +func expBackoff(generation int, minDelay, maxDelay time.Duration) time.Duration { + delay := math.Min(math.Pow(2, float64(generation))*float64(minDelay), float64(maxDelay)) + return time.Duration(delay) +}