Skip to content

Commit 0df6d15

Browse files
committed
Add row cache framework
1 parent dfca61b commit 0df6d15

File tree

15 files changed

+709
-37
lines changed

15 files changed

+709
-37
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,4 +316,11 @@ default boolean matchReplicationScope(boolean enabled) {
316316
}
317317
return !enabled;
318318
}
319+
320+
/**
321+
* Checks whether row caching is enabled for this table. Note that row caching applies only at the
322+
* entire row level, not at the column family level.
323+
* @return {@code true} if row cache is enabled, otherwise {@code false}
324+
*/
325+
boolean isRowCacheEnabled();
319326
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,15 @@ public class TableDescriptorBuilder {
227227
private final static Map<String, String> DEFAULT_VALUES = new HashMap<>();
228228
private final static Set<Bytes> RESERVED_KEYWORDS = new HashSet<>();
229229

230+
/**
231+
* Used by HBase Shell interface to access this metadata attribute which denotes if the row cache
232+
* is enabled.
233+
*/
234+
@InterfaceAudience.Private
235+
public static final String ROW_CACHE_ENABLED = "ROW_CACHE_ENABLED";
236+
private static final Bytes ROW_CACHE_ENABLED_KEY = new Bytes(Bytes.toBytes(ROW_CACHE_ENABLED));
237+
private static final boolean DEFAULT_ROW_CACHE_ENABLED = false;
238+
230239
static {
231240
DEFAULT_VALUES.put(MAX_FILESIZE, String.valueOf(HConstants.DEFAULT_MAX_FILE_SIZE));
232241
DEFAULT_VALUES.put(READONLY, String.valueOf(DEFAULT_READONLY));
@@ -236,6 +245,7 @@ public class TableDescriptorBuilder {
236245
DEFAULT_VALUES.put(PRIORITY, String.valueOf(DEFAULT_PRIORITY));
237246
// Setting ERASURE_CODING_POLICY to NULL so that it is not considered as metadata
238247
DEFAULT_VALUES.put(ERASURE_CODING_POLICY, String.valueOf(DEFAULT_ERASURE_CODING_POLICY));
248+
DEFAULT_VALUES.put(ROW_CACHE_ENABLED, String.valueOf(DEFAULT_ROW_CACHE_ENABLED));
239249
DEFAULT_VALUES.keySet().stream().map(s -> new Bytes(Bytes.toBytes(s)))
240250
.forEach(RESERVED_KEYWORDS::add);
241251
RESERVED_KEYWORDS.add(IS_META_KEY);
@@ -565,6 +575,11 @@ public TableDescriptor build() {
565575
return new ModifyableTableDescriptor(desc);
566576
}
567577

578+
public TableDescriptorBuilder setRowCacheEnabled(boolean rowCacheEnabled) {
579+
desc.setRowCacheEnabled(rowCacheEnabled);
580+
return this;
581+
}
582+
568583
private static final class ModifyableTableDescriptor
569584
implements TableDescriptor, Comparable<ModifyableTableDescriptor> {
570585

@@ -1510,6 +1525,15 @@ public Optional<String> getRegionServerGroup() {
15101525
return Optional.empty();
15111526
}
15121527
}
1528+
1529+
@Override
1530+
public boolean isRowCacheEnabled() {
1531+
return getOrDefault(ROW_CACHE_ENABLED_KEY, Boolean::valueOf, DEFAULT_ROW_CACHE_ENABLED);
1532+
}
1533+
1534+
private ModifyableTableDescriptor setRowCacheEnabled(boolean enabled) {
1535+
return setValue(ROW_CACHE_ENABLED_KEY, Boolean.toString(enabled));
1536+
}
15131537
}
15141538

15151539
/**

hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1017,6 +1017,12 @@ public enum OperationStatusCode {
10171017

10181018
public static final float HFILE_BLOCK_CACHE_SIZE_DEFAULT = 0.4f;
10191019

1020+
/**
1021+
* Configuration key for the size of the row cache
1022+
*/
1023+
public static final String ROW_CACHE_SIZE_KEY = "row.cache.size";
1024+
public static final float ROW_CACHE_SIZE_DEFAULT = 0.0f;
1025+
10201026
/**
10211027
* Configuration key for the memory size of the block cache
10221028
*/

hbase-server/src/main/java/org/apache/hadoop/hbase/io/util/MemorySizeUtil.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -93,25 +93,29 @@ public static void validateRegionServerHeapMemoryAllocation(Configuration conf)
9393
}
9494
float memStoreFraction = getGlobalMemStoreHeapPercent(conf, false);
9595
float blockCacheFraction = getBlockCacheHeapPercent(conf);
96+
float rowCacheFraction =
97+
conf.getFloat(HConstants.ROW_CACHE_SIZE_KEY, HConstants.ROW_CACHE_SIZE_DEFAULT);
9698
float minFreeHeapFraction = getRegionServerMinFreeHeapFraction(conf);
9799

98100
int memStorePercent = (int) (memStoreFraction * 100);
99101
int blockCachePercent = (int) (blockCacheFraction * 100);
102+
int rowCachePercent = (int) (rowCacheFraction * 100);
100103
int minFreeHeapPercent = (int) (minFreeHeapFraction * 100);
101-
int usedPercent = memStorePercent + blockCachePercent;
104+
int usedPercent = memStorePercent + blockCachePercent + rowCachePercent;
102105
int maxAllowedUsed = 100 - minFreeHeapPercent;
103106

104107
if (usedPercent > maxAllowedUsed) {
105108
throw new RuntimeException(String.format(
106109
"RegionServer heap memory allocation is invalid: total memory usage exceeds 100%% "
107-
+ "(memStore + blockCache + requiredFreeHeap). "
108-
+ "Check the following configuration values:%n" + " - %s = %.2f%n" + " - %s = %s%n"
109-
+ " - %s = %s%n" + " - %s = %s",
110+
+ "(memStore + blockCache + rowCache + requiredFreeHeap). "
111+
+ "Check the following configuration values:" + "%n - %s = %.2f" + "%n - %s = %s"
112+
+ "%n - %s = %s" + "%n - %s = %s" + "%n - %s = %s",
110113
MEMSTORE_SIZE_KEY, memStoreFraction, HConstants.HFILE_BLOCK_CACHE_MEMORY_SIZE_KEY,
111114
conf.get(HConstants.HFILE_BLOCK_CACHE_MEMORY_SIZE_KEY),
112115
HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, conf.get(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY),
113116
HBASE_REGION_SERVER_FREE_HEAP_MIN_MEMORY_SIZE_KEY,
114-
conf.get(HBASE_REGION_SERVER_FREE_HEAP_MIN_MEMORY_SIZE_KEY)));
117+
conf.get(HBASE_REGION_SERVER_FREE_HEAP_MIN_MEMORY_SIZE_KEY), HConstants.ROW_CACHE_SIZE_KEY,
118+
conf.get(HConstants.ROW_CACHE_SIZE_KEY)));
115119
}
116120
}
117121

@@ -313,4 +317,15 @@ public static long getBucketCacheSize(final Configuration conf) {
313317
}
314318
return (long) (bucketCacheSize * 1024 * 1024);
315319
}
320+
321+
public static long getRowCacheSize(Configuration conf) {
322+
long max = -1L;
323+
final MemoryUsage usage = safeGetHeapMemoryUsage();
324+
if (usage != null) {
325+
max = usage.getMax();
326+
}
327+
float globalRowCachePercent =
328+
conf.getFloat(HConstants.ROW_CACHE_SIZE_KEY, HConstants.ROW_CACHE_SIZE_DEFAULT);
329+
return ((long) (max * globalRowCachePercent));
330+
}
316331
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import java.util.concurrent.TimeUnit;
6666
import java.util.concurrent.atomic.AtomicBoolean;
6767
import java.util.concurrent.atomic.AtomicInteger;
68+
import java.util.concurrent.atomic.AtomicLong;
6869
import java.util.concurrent.atomic.LongAdder;
6970
import java.util.concurrent.locks.Lock;
7071
import java.util.concurrent.locks.ReadWriteLock;
@@ -433,6 +434,11 @@ public MetricsTableRequests getMetricsTableRequests() {
433434
*/
434435
private long openSeqNum = HConstants.NO_SEQNUM;
435436

437+
/**
438+
* Basically the same as openSeqNum, but it is updated when bulk load is done.
439+
*/
440+
private final AtomicLong rowCacheSeqNum = new AtomicLong(HConstants.NO_SEQNUM);
441+
436442
/**
437443
* The default setting for whether to enable on-demand CF loading for scan requests to this
438444
* region. Requests can override it.
@@ -7881,6 +7887,7 @@ private HRegion openHRegion(final CancelableProgressable reporter) throws IOExce
78817887
LOG.debug("checking classloading for " + this.getRegionInfo().getEncodedName());
78827888
TableDescriptorChecker.checkClassLoading(cConfig, htableDescriptor);
78837889
this.openSeqNum = initialize(reporter);
7890+
this.rowCacheSeqNum.set(this.openSeqNum);
78847891
this.mvcc.advanceTo(openSeqNum);
78857892
// The openSeqNum must be increased every time when a region is assigned, as we rely on it to
78867893
// determine whether a region has been successfully reopened. So here we always write open
@@ -8709,6 +8716,17 @@ public long getOpenSeqNum() {
87098716
return this.openSeqNum;
87108717
}
87118718

8719+
public long getRowCacheSeqNum() {
8720+
return this.rowCacheSeqNum.get();
8721+
}
8722+
8723+
/**
8724+
* This is used to invalidate the row cache of the bulk-loaded region.
8725+
*/
8726+
public void increaseRowCacheSeqNum() {
8727+
this.rowCacheSeqNum.incrementAndGet();
8728+
}
8729+
87128730
@Override
87138731
public Map<byte[], Long> getMaxStoreSeqId() {
87148732
return this.maxSeqIdInStores;

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java

Lines changed: 47 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,11 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
354354
public static final String REGIONSERVER_BOOTSTRAP_NODES_SERVICE_CONFIG =
355355
"hbase.regionserver.bootstrap.nodes.executorService";
356356

357+
/**
358+
* The row cache service
359+
*/
360+
private final RowCacheService rowCacheService = new RowCacheService(getConfiguration());
361+
357362
/**
358363
* An Rpc callback for closing a RegionScanner.
359364
*/
@@ -668,7 +673,7 @@ private CheckAndMutateResult checkAndMutate(HRegion region, List<ClientProtos.Ac
668673
result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate);
669674
}
670675
if (result == null) {
671-
result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce);
676+
result = rowCacheService.checkAndMutate(region, checkAndMutate, nonceGroup, nonce);
672677
if (region.getCoprocessorHost() != null) {
673678
result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result);
674679
}
@@ -1020,7 +1025,8 @@ private void doBatchOp(final RegionActionResult.Builder builder, final HRegion r
10201025
Arrays.sort(mArray, (v1, v2) -> Row.COMPARATOR.compare(v1, v2));
10211026
}
10221027

1023-
OperationStatus[] codes = region.batchMutate(mArray, atomic, nonceGroup, nonce);
1028+
OperationStatus[] codes =
1029+
rowCacheService.batchMutate(region, mArray, atomic, nonceGroup, nonce);
10241030

10251031
// When atomic is true, it indicates that the mutateRow API or the batch API with
10261032
// RowMutations is called. In this case, we need to merge the results of the
@@ -2336,6 +2342,11 @@ public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
23362342
@Override
23372343
public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
23382344
final BulkLoadHFileRequest request) throws ServiceException {
2345+
return rowCacheService.bulkLoadHFile(this, request);
2346+
}
2347+
2348+
BulkLoadHFileResponse bulkLoadHFileInternal(final BulkLoadHFileRequest request)
2349+
throws ServiceException {
23392350
long start = EnvironmentEdgeManager.currentTime();
23402351
List<String> clusterIds = new ArrayList<>(request.getClusterIdsList());
23412352
if (clusterIds.contains(this.server.getClusterId())) {
@@ -2592,8 +2603,7 @@ private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCal
25922603
RegionScannerImpl scanner = null;
25932604
long blockBytesScannedBefore = context.getBlockBytesScanned();
25942605
try {
2595-
scanner = region.getScanner(scan);
2596-
scanner.next(results);
2606+
scanner = rowCacheService.getScanner(region, scan, results);
25972607
} finally {
25982608
if (scanner != null) {
25992609
if (closeCallBack == null) {
@@ -3002,33 +3012,10 @@ public MutateResponse mutate(final RpcController rpcc, final MutateRequest reque
30023012
builder.setMetrics(ProtobufUtil.toQueryMetrics(result.getMetrics()));
30033013
}
30043014
} else {
3005-
Result r = null;
3006-
Boolean processed = null;
3007-
MutationType type = mutation.getMutateType();
3008-
switch (type) {
3009-
case APPEND:
3010-
// TODO: this doesn't actually check anything.
3011-
r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement,
3012-
context);
3013-
break;
3014-
case INCREMENT:
3015-
// TODO: this doesn't actually check anything.
3016-
r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement,
3017-
context);
3018-
break;
3019-
case PUT:
3020-
put(region, quota, mutation, cellScanner, spaceQuotaEnforcement);
3021-
processed = Boolean.TRUE;
3022-
break;
3023-
case DELETE:
3024-
delete(region, quota, mutation, cellScanner, spaceQuotaEnforcement);
3025-
processed = Boolean.TRUE;
3026-
break;
3027-
default:
3028-
throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
3029-
}
3030-
if (processed != null) {
3031-
builder.setProcessed(processed);
3015+
Result r = rowCacheService.mutate(this, region, mutation, quota, cellScanner, nonceGroup,
3016+
spaceQuotaEnforcement, context);
3017+
if (r == Result.EMPTY_RESULT) {
3018+
builder.setProcessed(true);
30323019
}
30333020
boolean clientCellBlockSupported = isClientCellBlockSupport(context);
30343021
addResult(builder, r, controller, clientCellBlockSupported);
@@ -3047,6 +3034,29 @@ public MutateResponse mutate(final RpcController rpcc, final MutateRequest reque
30473034
}
30483035
}
30493036

3037+
Result mutateInternal(MutationProto mutation, HRegion region, OperationQuota quota,
3038+
CellScanner cellScanner, long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement,
3039+
RpcCallContext context) throws IOException {
3040+
MutationType type = mutation.getMutateType();
3041+
return switch (type) {
3042+
case APPEND ->
3043+
// TODO: this doesn't actually check anything.
3044+
append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement, context);
3045+
case INCREMENT ->
3046+
// TODO: this doesn't actually check anything.
3047+
increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement,
3048+
context);
3049+
case PUT -> {
3050+
put(region, quota, mutation, cellScanner, spaceQuotaEnforcement);
3051+
yield Result.EMPTY_RESULT;
3052+
}
3053+
case DELETE -> {
3054+
delete(region, quota, mutation, cellScanner, spaceQuotaEnforcement);
3055+
yield Result.EMPTY_RESULT;
3056+
}
3057+
};
3058+
}
3059+
30503060
private void put(HRegion region, OperationQuota quota, MutationProto mutation,
30513061
CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException {
30523062
long before = EnvironmentEdgeManager.currentTime();
@@ -3095,7 +3105,7 @@ private CheckAndMutateResult checkAndMutate(HRegion region, OperationQuota quota
30953105
result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate);
30963106
}
30973107
if (result == null) {
3098-
result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce);
3108+
result = rowCacheService.checkAndMutate(region, checkAndMutate, nonceGroup, nonce);
30993109
if (region.getCoprocessorHost() != null) {
31003110
result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result);
31013111
}
@@ -4079,4 +4089,9 @@ RegionScannerContext checkQuotaAndGetRegionScannerContext(ScanRequest request,
40794089
Pair<String, RegionScannerHolder> pair = newRegionScanner(request, region, builder);
40804090
return new RegionScannerContext(pair.getFirst(), pair.getSecond(), quota);
40814091
}
4092+
4093+
// For testing only
4094+
public RowCacheService getRowCacheService() {
4095+
return rowCacheService;
4096+
}
40824097
}

0 commit comments

Comments
 (0)