Skip to content

Commit a79100b

Browse files
rmdmattinglyRay Mattingly
andauthored
HBASE-29351 Quotas: adaptive wait intervals (#7396)
Co-authored-by: Ray Mattingly <[email protected]> Signed-off-by: Charles Connell <[email protected]>
1 parent a81a5fd commit a79100b

File tree

3 files changed

+923
-5
lines changed

3 files changed

+923
-5
lines changed
Lines changed: 361 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,361 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.quotas;
19+
20+
import java.util.concurrent.atomic.AtomicLong;
21+
import org.apache.hadoop.conf.Configuration;
22+
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
23+
import org.apache.yetus.audience.InterfaceAudience;
24+
import org.apache.yetus.audience.InterfaceStability;
25+
26+
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
27+
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.AtomicDouble;
28+
29+
/**
30+
* An adaptive rate limiter that dynamically adjusts its behavior based on observed usage patterns
31+
* to achieve stable, full utilization of configured quota allowances while managing client
32+
* contention.
33+
* <p>
34+
* <b>Core Algorithm:</b> This rate limiter divides time into fixed refill intervals (configurable
35+
* via {@code hbase.quota.rate.limiter.refill.interval.ms}, default is 1 refill per TimeUnit of the
36+
* RateLimiter). At the beginning of each interval, a fresh allocation of resources becomes
37+
* available based on the configured limit. Clients consume resources as they make requests. When
38+
* resources are exhausted, clients must wait until the next refill, or until enough resources
39+
* become available.
40+
* <p>
41+
* <b>Adaptive Backpressure:</b> When multiple threads compete for limited resources (contention),
42+
* this limiter detects the contention and applies increasing backpressure by extending wait
43+
* intervals. This prevents thundering herd behavior where many threads wake simultaneously and
44+
* compete for the same resources. The backoff multiplier increases by a small increment (see
45+
* {@link #FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_INCREMENT}) per interval when contention occurs, and
46+
* decreases (see {@link #FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_DECREMENT}) when no contention is
47+
* detected, converging toward optimal throughput. The multiplier is capped at a maximum value (see
48+
* {@link #FEEDBACK_ADAPTIVE_MAX_BACKOFF_MULTIPLIER}) to prevent unbounded waits.
49+
* <p>
50+
* Contention is detected when {@link #getWaitInterval} is called with insufficient available
51+
* resources (i.e., {@code amount > available}), indicating a thread needs to wait for resources. If
52+
* this occurs more than once in a refill interval, the limiter identifies it as contention
53+
* requiring increased backpressure.
54+
* <p>
55+
* <b>Oversubscription for Full Utilization:</b> In practice, synchronization overhead and timing
56+
* variations often prevent clients from consuming exactly their full allowance, resulting in
57+
* consistent under-utilization. This limiter addresses this by tracking utilization via an
58+
* exponentially weighted moving average (EWMA). When average utilization falls below the target
59+
* range (determined by {@link #FEEDBACK_ADAPTIVE_UTILIZATION_ERROR_BUDGET}), the limiter gradually
60+
* increases the oversubscription proportion (see
61+
* {@link #FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_INCREMENT}), allowing more resources per interval than
62+
* the base limit. Conversely, when utilization exceeds the target range, oversubscription is
63+
* decreased (see {@link #FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_DECREMENT}). Oversubscription is capped
64+
* (see {@link #FEEDBACK_ADAPTIVE_MAX_OVERSUBSCRIPTION}) to prevent excessive bursts while still
65+
* enabling consistent full utilization.
66+
* <p>
67+
* <b>Example Scenario:</b> Consider a quota of 1000 requests per second with a 1-second refill
68+
* interval. Without oversubscription, clients might typically achieve only 950 req/s due to
69+
* coordination delays. This limiter would detect the under-utilization, gradually increase
70+
* oversubscription, allowing slightly more resources per interval, which compensates for
71+
* inefficiencies and achieves stable throughput closer to the configured quota. If multiple threads
72+
* simultaneously try to consume resources and repeatedly wait, the backoff multiplier increases
73+
* their wait times, spreading out their retry attempts and reducing wasted CPU cycles.
74+
* <p>
75+
* <b>Configuration Parameters:</b>
76+
* <ul>
77+
* <li>{@link #FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_INCREMENT}: Controls rate of backpressure
78+
* increase</li>
79+
* <li>{@link #FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_DECREMENT}: Controls rate of backpressure
80+
* decrease</li>
81+
* <li>{@link #FEEDBACK_ADAPTIVE_MAX_BACKOFF_MULTIPLIER}: Caps the maximum wait time extension</li>
82+
* <li>{@link #FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_INCREMENT}: Controls rate of oversubscription
83+
* increase</li>
84+
* <li>{@link #FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_DECREMENT}: Controls rate of oversubscription
85+
* decrease</li>
86+
* <li>{@link #FEEDBACK_ADAPTIVE_MAX_OVERSUBSCRIPTION}: Caps the maximum burst capacity</li>
87+
* <li>{@link #FEEDBACK_ADAPTIVE_UTILIZATION_ERROR_BUDGET}: Defines the acceptable range around full
88+
* utilization</li>
89+
* </ul>
90+
* <p>
91+
* This algorithm converges toward stable operation where: (1) wait intervals are just long enough
92+
* to prevent excessive contention, and (2) oversubscription is just high enough to achieve
93+
* consistent full utilization of the configured allowance.
94+
*/
95+
@InterfaceAudience.Private
96+
@InterfaceStability.Evolving
97+
public class FeedbackAdaptiveRateLimiter extends RateLimiter {
98+
99+
/**
100+
* Amount to increase the backoff multiplier when contention is detected per refill interval. In
101+
* other words, if we are throttling more than once per refill interval, then we will increase our
102+
* wait intervals (increase backpressure, decrease throughput).
103+
*/
104+
public static final String FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_INCREMENT =
105+
"hbase.quota.rate.limiter.feedback.adaptive.backoff.multiplier.increment";
106+
public static final double DEFAULT_BACKOFF_MULTIPLIER_INCREMENT = 0.0005;
107+
108+
/**
109+
* Amount to decrease the backoff multiplier when no contention is detected per refill interval.
110+
* In other words, if we are only throttling once per refill interval, then we will decrease our
111+
* wait interval (decrease backpressure, increase throughput).
112+
*/
113+
public static final String FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_DECREMENT =
114+
"hbase.quota.rate.limiter.feedback.adaptive.backoff.multiplier.decrement";
115+
public static final double DEFAULT_BACKOFF_MULTIPLIER_DECREMENT = 0.0001;
116+
117+
/**
118+
* Maximum ceiling for the backoff multiplier to avoid unbounded waits.
119+
*/
120+
public static final String FEEDBACK_ADAPTIVE_MAX_BACKOFF_MULTIPLIER =
121+
"hbase.quota.rate.limiter.feedback.adaptive.max.backoff.multiplier";
122+
public static final double DEFAULT_MAX_BACKOFF_MULTIPLIER = 10.0;
123+
124+
/**
125+
* Amount to increase the oversubscription proportion when utilization is below (1.0-errorBudget).
126+
*/
127+
public static final String FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_INCREMENT =
128+
"hbase.quota.rate.limiter.feedback.adaptive.oversubscription.increment";
129+
public static final double DEFAULT_OVERSUBSCRIPTION_INCREMENT = 0.001;
130+
131+
/**
132+
* Amount to decrease the oversubscription proportion when utilization exceeds (1.0+errorBudget).
133+
*/
134+
public static final String FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_DECREMENT =
135+
"hbase.quota.rate.limiter.feedback.adaptive.oversubscription.decrement";
136+
public static final double DEFAULT_OVERSUBSCRIPTION_DECREMENT = 0.00005;
137+
138+
/**
139+
* Maximum ceiling for oversubscription to prevent unbounded bursts. Some oversubscription can be
140+
* nice, because it allows you to balance the inefficiency and latency of retries, landing on
141+
* stable usage at approximately your configured allowance. Without adequate oversubscription,
142+
* your steady state may often seem significantly, and suspiciously, lower than your configured
143+
* allowance.
144+
*/
145+
public static final String FEEDBACK_ADAPTIVE_MAX_OVERSUBSCRIPTION =
146+
"hbase.quota.rate.limiter.feedback.adaptive.max.oversubscription";
147+
public static final double DEFAULT_MAX_OVERSUBSCRIPTION = 0.25;
148+
149+
/**
150+
* Acceptable deviation around full utilization (1.0) for adjusting oversubscription. If stable
151+
* throttle usage is typically under (1.0-errorBudget), then we will allow more oversubscription.
152+
* If stable throttle usage is typically over (1.0+errorBudget), then we will pull back
153+
* oversubscription.
154+
*/
155+
public static final String FEEDBACK_ADAPTIVE_UTILIZATION_ERROR_BUDGET =
156+
"hbase.quota.rate.limiter.feedback.adaptive.utilization.error.budget";
157+
public static final double DEFAULT_UTILIZATION_ERROR_BUDGET = 0.025;
158+
159+
private static final int WINDOW_TIME_MS = 60_000;
160+
161+
public static class FeedbackAdaptiveRateLimiterFactory {
162+
163+
private final long refillInterval;
164+
private final double backoffMultiplierIncrement;
165+
private final double backoffMultiplierDecrement;
166+
private final double maxBackoffMultiplier;
167+
private final double oversubscriptionIncrement;
168+
private final double oversubscriptionDecrement;
169+
private final double maxOversubscription;
170+
private final double utilizationErrorBudget;
171+
172+
public FeedbackAdaptiveRateLimiterFactory(Configuration conf) {
173+
refillInterval = conf.getLong(FixedIntervalRateLimiter.RATE_LIMITER_REFILL_INTERVAL_MS,
174+
RateLimiter.DEFAULT_TIME_UNIT);
175+
176+
maxBackoffMultiplier =
177+
conf.getDouble(FEEDBACK_ADAPTIVE_MAX_BACKOFF_MULTIPLIER, DEFAULT_MAX_BACKOFF_MULTIPLIER);
178+
179+
backoffMultiplierIncrement = conf.getDouble(FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_INCREMENT,
180+
DEFAULT_BACKOFF_MULTIPLIER_INCREMENT);
181+
backoffMultiplierDecrement = conf.getDouble(FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_DECREMENT,
182+
DEFAULT_BACKOFF_MULTIPLIER_DECREMENT);
183+
184+
oversubscriptionIncrement = conf.getDouble(FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_INCREMENT,
185+
DEFAULT_OVERSUBSCRIPTION_INCREMENT);
186+
oversubscriptionDecrement = conf.getDouble(FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_DECREMENT,
187+
DEFAULT_OVERSUBSCRIPTION_DECREMENT);
188+
189+
maxOversubscription =
190+
conf.getDouble(FEEDBACK_ADAPTIVE_MAX_OVERSUBSCRIPTION, DEFAULT_MAX_OVERSUBSCRIPTION);
191+
utilizationErrorBudget = conf.getDouble(FEEDBACK_ADAPTIVE_UTILIZATION_ERROR_BUDGET,
192+
DEFAULT_UTILIZATION_ERROR_BUDGET);
193+
}
194+
195+
public FeedbackAdaptiveRateLimiter create() {
196+
return new FeedbackAdaptiveRateLimiter(refillInterval, backoffMultiplierIncrement,
197+
backoffMultiplierDecrement, maxBackoffMultiplier, oversubscriptionIncrement,
198+
oversubscriptionDecrement, maxOversubscription, utilizationErrorBudget);
199+
}
200+
}
201+
202+
private volatile long nextRefillTime = -1L;
203+
private final long refillInterval;
204+
private final double backoffMultiplierIncrement;
205+
private final double backoffMultiplierDecrement;
206+
private final double maxBackoffMultiplier;
207+
private final double oversubscriptionIncrement;
208+
private final double oversubscriptionDecrement;
209+
private final double maxOversubscription;
210+
private final double minTargetUtilization;
211+
private final double maxTargetUtilization;
212+
213+
// Adaptive backoff state
214+
private final AtomicDouble currentBackoffMultiplier = new AtomicDouble(1.0);
215+
private volatile boolean hadContentionThisInterval = false;
216+
217+
// Over-subscription proportion state
218+
private final AtomicDouble oversubscriptionProportion = new AtomicDouble(0.0);
219+
220+
// EWMA tracking
221+
private final double emaAlpha;
222+
private volatile double utilizationEma = 0.0;
223+
private final AtomicLong lastIntervalConsumed;
224+
225+
FeedbackAdaptiveRateLimiter(long refillInterval, double backoffMultiplierIncrement,
226+
double backoffMultiplierDecrement, double maxBackoffMultiplier,
227+
double oversubscriptionIncrement, double oversubscriptionDecrement, double maxOversubscription,
228+
double utilizationErrorBudget) {
229+
super();
230+
Preconditions.checkArgument(getTimeUnitInMillis() >= refillInterval, String.format(
231+
"Refill interval %s must be ≤ TimeUnit millis %s", refillInterval, getTimeUnitInMillis()));
232+
233+
Preconditions.checkArgument(backoffMultiplierIncrement > 0.0,
234+
String.format("Backoff multiplier increment %s must be > 0.0", backoffMultiplierIncrement));
235+
Preconditions.checkArgument(backoffMultiplierDecrement > 0.0,
236+
String.format("Backoff multiplier decrement %s must be > 0.0", backoffMultiplierDecrement));
237+
Preconditions.checkArgument(maxBackoffMultiplier > 1.0,
238+
String.format("Max backoff multiplier %s must be > 1.0", maxBackoffMultiplier));
239+
Preconditions.checkArgument(utilizationErrorBudget > 0.0 && utilizationErrorBudget <= 1.0,
240+
String.format("Utilization error budget %s must be between 0.0 and 1.0",
241+
utilizationErrorBudget));
242+
243+
this.refillInterval = refillInterval;
244+
this.backoffMultiplierIncrement = backoffMultiplierIncrement;
245+
this.backoffMultiplierDecrement = backoffMultiplierDecrement;
246+
this.maxBackoffMultiplier = maxBackoffMultiplier;
247+
this.oversubscriptionIncrement = oversubscriptionIncrement;
248+
this.oversubscriptionDecrement = oversubscriptionDecrement;
249+
this.maxOversubscription = maxOversubscription;
250+
this.minTargetUtilization = 1.0 - utilizationErrorBudget;
251+
this.maxTargetUtilization = 1.0 + utilizationErrorBudget;
252+
253+
this.emaAlpha = refillInterval / (double) (WINDOW_TIME_MS + refillInterval);
254+
this.lastIntervalConsumed = new AtomicLong(0);
255+
}
256+
257+
@Override
258+
public long refill(long limit) {
259+
final long now = EnvironmentEdgeManager.currentTime();
260+
if (nextRefillTime == -1) {
261+
nextRefillTime = now + refillInterval;
262+
hadContentionThisInterval = false;
263+
return getOversubscribedLimit(limit);
264+
}
265+
if (now < nextRefillTime) {
266+
return 0;
267+
}
268+
long diff = refillInterval + now - nextRefillTime;
269+
long refills = diff / refillInterval;
270+
nextRefillTime = now + refillInterval;
271+
272+
long intendedUsage = getRefillIntervalAdjustedLimit(limit);
273+
if (intendedUsage > 0) {
274+
long consumed = lastIntervalConsumed.get();
275+
if (consumed > 0) {
276+
double util = (double) consumed / intendedUsage;
277+
utilizationEma = emaAlpha * util + (1.0 - emaAlpha) * utilizationEma;
278+
}
279+
}
280+
281+
if (hadContentionThisInterval) {
282+
currentBackoffMultiplier.set(Math
283+
.min(currentBackoffMultiplier.get() + backoffMultiplierIncrement, maxBackoffMultiplier));
284+
} else {
285+
currentBackoffMultiplier
286+
.set(Math.max(currentBackoffMultiplier.get() - backoffMultiplierDecrement, 1.0));
287+
}
288+
289+
double avgUtil = utilizationEma;
290+
if (avgUtil < minTargetUtilization) {
291+
oversubscriptionProportion.set(Math
292+
.min(oversubscriptionProportion.get() + oversubscriptionIncrement, maxOversubscription));
293+
} else if (avgUtil >= maxTargetUtilization) {
294+
oversubscriptionProportion
295+
.set(Math.max(oversubscriptionProportion.get() - oversubscriptionDecrement, 0.0));
296+
}
297+
298+
hadContentionThisInterval = false;
299+
lastIntervalConsumed.set(0);
300+
301+
long refillAmount = refills * getRefillIntervalAdjustedLimit(limit);
302+
long maxRefill = getOversubscribedLimit(limit);
303+
return Math.min(maxRefill, refillAmount);
304+
}
305+
306+
private long getOversubscribedLimit(long limit) {
307+
return limit + (long) (limit * oversubscriptionProportion.get());
308+
}
309+
310+
@Override
311+
public void consume(long amount) {
312+
super.consume(amount);
313+
lastIntervalConsumed.addAndGet(amount);
314+
}
315+
316+
@Override
317+
public long getWaitInterval(long limit, long available, long amount) {
318+
limit = getRefillIntervalAdjustedLimit(limit);
319+
if (nextRefillTime == -1) {
320+
return 0;
321+
}
322+
323+
final long now = EnvironmentEdgeManager.currentTime();
324+
final long refillTime = nextRefillTime;
325+
long diff = amount - available;
326+
if (diff > 0) {
327+
hadContentionThisInterval = true;
328+
}
329+
330+
long nextInterval = refillTime - now;
331+
if (diff <= limit) {
332+
return applyBackoffMultiplier(nextInterval);
333+
}
334+
335+
long extra = diff / limit;
336+
if (diff % limit == 0) {
337+
extra--;
338+
}
339+
long baseWait = nextInterval + (extra * refillInterval);
340+
return applyBackoffMultiplier(baseWait);
341+
}
342+
343+
private long getRefillIntervalAdjustedLimit(long limit) {
344+
return (long) Math.ceil(refillInterval / (double) getTimeUnitInMillis() * limit);
345+
}
346+
347+
private long applyBackoffMultiplier(long baseWaitInterval) {
348+
return (long) (baseWaitInterval * currentBackoffMultiplier.get());
349+
}
350+
351+
// strictly for testing
352+
@Override
353+
public void setNextRefillTime(long nextRefillTime) {
354+
this.nextRefillTime = nextRefillTime;
355+
}
356+
357+
@Override
358+
public long getNextRefillTime() {
359+
return this.nextRefillTime;
360+
}
361+
}

0 commit comments

Comments
 (0)