Skip to content

Commit a81a5fd

Browse files
HBASE-29677: Thread safety in QuotaRefresherChore (#7401)
Signed-off by: Ray Mattingly <[email protected]>
1 parent 47f7e1d commit a81a5fd

File tree

2 files changed

+87
-38
lines changed

2 files changed

+87
-38
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java

Lines changed: 41 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import java.io.IOException;
2121
import java.time.Duration;
2222
import java.util.EnumSet;
23-
import java.util.HashMap;
2423
import java.util.Map;
2524
import java.util.Optional;
2625
import java.util.concurrent.ConcurrentHashMap;
@@ -70,10 +69,10 @@ public class QuotaCache implements Stoppable {
7069
private final Object initializerLock = new Object();
7170
private volatile boolean initialized = false;
7271

73-
private volatile Map<String, QuotaState> namespaceQuotaCache = new HashMap<>();
74-
private volatile Map<TableName, QuotaState> tableQuotaCache = new HashMap<>();
75-
private volatile Map<String, UserQuotaState> userQuotaCache = new HashMap<>();
76-
private volatile Map<String, QuotaState> regionServerQuotaCache = new HashMap<>();
72+
private volatile Map<String, QuotaState> namespaceQuotaCache = new ConcurrentHashMap<>();
73+
private volatile Map<TableName, QuotaState> tableQuotaCache = new ConcurrentHashMap<>();
74+
private volatile Map<String, UserQuotaState> userQuotaCache = new ConcurrentHashMap<>();
75+
private volatile Map<String, QuotaState> regionServerQuotaCache = new ConcurrentHashMap<>();
7776

7877
private volatile boolean exceedThrottleQuotaEnabled = false;
7978
// factors used to divide cluster scope quota into machine scope quota
@@ -310,44 +309,48 @@ public synchronized boolean triggerNow() {
310309

311310
@Override
312311
protected void chore() {
313-
updateQuotaFactors();
312+
synchronized (this) {
313+
LOG.info("Reloading quota cache from hbase:quota table");
314+
updateQuotaFactors();
315+
316+
try {
317+
Map<String, UserQuotaState> newUserQuotaCache =
318+
new ConcurrentHashMap<>(fetchUserQuotaStateEntries());
319+
updateNewCacheFromOld(userQuotaCache, newUserQuotaCache);
320+
userQuotaCache = newUserQuotaCache;
321+
} catch (IOException e) {
322+
LOG.error("Error while fetching user quotas", e);
323+
}
314324

315-
try {
316-
Map<String, UserQuotaState> newUserQuotaCache = new HashMap<>(fetchUserQuotaStateEntries());
317-
updateNewCacheFromOld(userQuotaCache, newUserQuotaCache);
318-
userQuotaCache = newUserQuotaCache;
319-
} catch (IOException e) {
320-
LOG.error("Error while fetching user quotas", e);
321-
}
325+
try {
326+
Map<String, QuotaState> newRegionServerQuotaCache =
327+
new ConcurrentHashMap<>(fetchRegionServerQuotaStateEntries());
328+
updateNewCacheFromOld(regionServerQuotaCache, newRegionServerQuotaCache);
329+
regionServerQuotaCache = newRegionServerQuotaCache;
330+
} catch (IOException e) {
331+
LOG.error("Error while fetching region server quotas", e);
332+
}
322333

323-
try {
324-
Map<String, QuotaState> newRegionServerQuotaCache =
325-
new HashMap<>(fetchRegionServerQuotaStateEntries());
326-
updateNewCacheFromOld(regionServerQuotaCache, newRegionServerQuotaCache);
327-
regionServerQuotaCache = newRegionServerQuotaCache;
328-
} catch (IOException e) {
329-
LOG.error("Error while fetching region server quotas", e);
330-
}
334+
try {
335+
Map<TableName, QuotaState> newTableQuotaCache =
336+
new ConcurrentHashMap<>(fetchTableQuotaStateEntries());
337+
updateNewCacheFromOld(tableQuotaCache, newTableQuotaCache);
338+
tableQuotaCache = newTableQuotaCache;
339+
} catch (IOException e) {
340+
LOG.error("Error while refreshing table quotas", e);
341+
}
331342

332-
try {
333-
Map<TableName, QuotaState> newTableQuotaCache =
334-
new HashMap<>(fetchTableQuotaStateEntries());
335-
updateNewCacheFromOld(tableQuotaCache, newTableQuotaCache);
336-
tableQuotaCache = newTableQuotaCache;
337-
} catch (IOException e) {
338-
LOG.error("Error while refreshing table quotas", e);
339-
}
343+
try {
344+
Map<String, QuotaState> newNamespaceQuotaCache =
345+
new ConcurrentHashMap<>(fetchNamespaceQuotaStateEntries());
346+
updateNewCacheFromOld(namespaceQuotaCache, newNamespaceQuotaCache);
347+
namespaceQuotaCache = newNamespaceQuotaCache;
348+
} catch (IOException e) {
349+
LOG.error("Error while refreshing namespace quotas", e);
350+
}
340351

341-
try {
342-
Map<String, QuotaState> newNamespaceQuotaCache =
343-
new HashMap<>(fetchNamespaceQuotaStateEntries());
344-
updateNewCacheFromOld(namespaceQuotaCache, newNamespaceQuotaCache);
345-
namespaceQuotaCache = newNamespaceQuotaCache;
346-
} catch (IOException e) {
347-
LOG.error("Error while refreshing namespace quotas", e);
352+
fetchExceedThrottleQuota();
348353
}
349-
350-
fetchExceedThrottleQuota();
351354
}
352355

353356
private void fetchExceedThrottleQuota() {

hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaCache2.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,4 +131,50 @@ public void testForgetsDeletedQuota() {
131131
assertTrue(newCache.containsKey("my_table2"));
132132
assertFalse(newCache.containsKey("my_table1"));
133133
}
134+
135+
@Test
136+
public void testLearnsNewQuota() {
137+
Map<String, QuotaState> oldCache = new HashMap<>();
138+
139+
QuotaState newState = new QuotaState();
140+
Map<String, QuotaState> newCache = new HashMap<>();
141+
newCache.put("my_table1", newState);
142+
143+
QuotaCache.updateNewCacheFromOld(oldCache, newCache);
144+
145+
assertTrue(newCache.containsKey("my_table1"));
146+
}
147+
148+
@Test
149+
public void testUserSpecificOverridesDefaultNewQuota() {
150+
// establish old cache with a limiter for 100 read bytes per second
151+
QuotaState oldState = new QuotaState();
152+
Map<String, QuotaState> oldCache = new HashMap<>();
153+
oldCache.put("my_table", oldState);
154+
QuotaProtos.Throttle throttle1 = QuotaProtos.Throttle.newBuilder()
155+
.setReadSize(QuotaProtos.TimedQuota.newBuilder().setTimeUnit(HBaseProtos.TimeUnit.SECONDS)
156+
.setSoftLimit(100).setScope(QuotaProtos.QuotaScope.MACHINE).build())
157+
.build();
158+
QuotaLimiter limiter1 = TimeBasedLimiter.fromThrottle(conf, throttle1);
159+
oldState.setGlobalLimiter(limiter1);
160+
161+
// establish new cache, with a limiter for 999 read bytes per second
162+
QuotaState newState = new QuotaState();
163+
Map<String, QuotaState> newCache = new HashMap<>();
164+
newCache.put("my_table", newState);
165+
QuotaProtos.Throttle throttle2 = QuotaProtos.Throttle.newBuilder()
166+
.setReadSize(QuotaProtos.TimedQuota.newBuilder().setTimeUnit(HBaseProtos.TimeUnit.SECONDS)
167+
.setSoftLimit(999).setScope(QuotaProtos.QuotaScope.MACHINE).build())
168+
.build();
169+
QuotaLimiter limiter2 = TimeBasedLimiter.fromThrottle(conf, throttle2);
170+
newState.setGlobalLimiter(limiter2);
171+
172+
// update new cache from old cache
173+
QuotaCache.updateNewCacheFromOld(oldCache, newCache);
174+
175+
// verify that the 999 available bytes from the limiter was carried over
176+
TimeBasedLimiter updatedLimiter =
177+
(TimeBasedLimiter) newCache.get("my_table").getGlobalLimiter();
178+
assertEquals(999, updatedLimiter.getReadAvailable());
179+
}
134180
}

0 commit comments

Comments
 (0)