diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java index 817f9e2d4b1b..6d7786b5c88a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java @@ -316,4 +316,11 @@ default boolean matchReplicationScope(boolean enabled) { } return !enabled; } + + /** + * Checks whether row caching is enabled for this table. Note that row caching applies only at the + * entire row level, not at the column family level. + * @return {@code true} if row cache is enabled, otherwise {@code false} + */ + boolean isRowCacheEnabled(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java index 8636b006e83d..467cd6358430 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java @@ -227,6 +227,15 @@ public class TableDescriptorBuilder { private final static Map DEFAULT_VALUES = new HashMap<>(); private final static Set RESERVED_KEYWORDS = new HashSet<>(); + /** + * Used by HBase Shell interface to access this metadata attribute which denotes if the row cache + * is enabled. + */ + @InterfaceAudience.Private + public static final String ROW_CACHE_ENABLED = "ROW_CACHE_ENABLED"; + private static final Bytes ROW_CACHE_ENABLED_KEY = new Bytes(Bytes.toBytes(ROW_CACHE_ENABLED)); + private static final boolean DEFAULT_ROW_CACHE_ENABLED = false; + static { DEFAULT_VALUES.put(MAX_FILESIZE, String.valueOf(HConstants.DEFAULT_MAX_FILE_SIZE)); DEFAULT_VALUES.put(READONLY, String.valueOf(DEFAULT_READONLY)); @@ -236,6 +245,7 @@ public class TableDescriptorBuilder { DEFAULT_VALUES.put(PRIORITY, String.valueOf(DEFAULT_PRIORITY)); // Setting ERASURE_CODING_POLICY to NULL so that it is not considered as metadata DEFAULT_VALUES.put(ERASURE_CODING_POLICY, String.valueOf(DEFAULT_ERASURE_CODING_POLICY)); + DEFAULT_VALUES.put(ROW_CACHE_ENABLED, String.valueOf(DEFAULT_ROW_CACHE_ENABLED)); DEFAULT_VALUES.keySet().stream().map(s -> new Bytes(Bytes.toBytes(s))) .forEach(RESERVED_KEYWORDS::add); RESERVED_KEYWORDS.add(IS_META_KEY); @@ -565,6 +575,11 @@ public TableDescriptor build() { return new ModifyableTableDescriptor(desc); } + public TableDescriptorBuilder setRowCacheEnabled(boolean rowCacheEnabled) { + desc.setRowCacheEnabled(rowCacheEnabled); + return this; + } + private static final class ModifyableTableDescriptor implements TableDescriptor, Comparable { @@ -1510,6 +1525,15 @@ public Optional getRegionServerGroup() { return Optional.empty(); } } + + @Override + public boolean isRowCacheEnabled() { + return getOrDefault(ROW_CACHE_ENABLED_KEY, Boolean::valueOf, DEFAULT_ROW_CACHE_ENABLED); + } + + private ModifyableTableDescriptor setRowCacheEnabled(boolean enabled) { + return setValue(ROW_CACHE_ENABLED_KEY, Boolean.toString(enabled)); + } } /** diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 1051686d32e8..5381a4e3a18b 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1017,6 +1017,12 @@ public enum OperationStatusCode { public static final float HFILE_BLOCK_CACHE_SIZE_DEFAULT = 0.4f; + /** + * Configuration key for the size of the row cache + */ + public static final String ROW_CACHE_SIZE_KEY = "row.cache.size"; + public static final float ROW_CACHE_SIZE_DEFAULT = 0.0f; + /** * Configuration key for the memory size of the block cache */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/util/MemorySizeUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/util/MemorySizeUtil.java index 7ada303d2939..6ad962a46348 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/util/MemorySizeUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/util/MemorySizeUtil.java @@ -93,25 +93,29 @@ public static void validateRegionServerHeapMemoryAllocation(Configuration conf) } float memStoreFraction = getGlobalMemStoreHeapPercent(conf, false); float blockCacheFraction = getBlockCacheHeapPercent(conf); + float rowCacheFraction = + conf.getFloat(HConstants.ROW_CACHE_SIZE_KEY, HConstants.ROW_CACHE_SIZE_DEFAULT); float minFreeHeapFraction = getRegionServerMinFreeHeapFraction(conf); int memStorePercent = (int) (memStoreFraction * 100); int blockCachePercent = (int) (blockCacheFraction * 100); + int rowCachePercent = (int) (rowCacheFraction * 100); int minFreeHeapPercent = (int) (minFreeHeapFraction * 100); - int usedPercent = memStorePercent + blockCachePercent; + int usedPercent = memStorePercent + blockCachePercent + rowCachePercent; int maxAllowedUsed = 100 - minFreeHeapPercent; if (usedPercent > maxAllowedUsed) { throw new RuntimeException(String.format( "RegionServer heap memory allocation is invalid: total memory usage exceeds 100%% " - + "(memStore + blockCache + requiredFreeHeap). " - + "Check the following configuration values:%n" + " - %s = %.2f%n" + " - %s = %s%n" - + " - %s = %s%n" + " - %s = %s", + + "(memStore + blockCache + rowCache + requiredFreeHeap). " + + "Check the following configuration values:" + "%n - %s = %.2f" + "%n - %s = %s" + + "%n - %s = %s" + "%n - %s = %s" + "%n - %s = %s", MEMSTORE_SIZE_KEY, memStoreFraction, HConstants.HFILE_BLOCK_CACHE_MEMORY_SIZE_KEY, conf.get(HConstants.HFILE_BLOCK_CACHE_MEMORY_SIZE_KEY), HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, conf.get(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY), HBASE_REGION_SERVER_FREE_HEAP_MIN_MEMORY_SIZE_KEY, - conf.get(HBASE_REGION_SERVER_FREE_HEAP_MIN_MEMORY_SIZE_KEY))); + conf.get(HBASE_REGION_SERVER_FREE_HEAP_MIN_MEMORY_SIZE_KEY), HConstants.ROW_CACHE_SIZE_KEY, + conf.get(HConstants.ROW_CACHE_SIZE_KEY))); } } @@ -313,4 +317,15 @@ public static long getBucketCacheSize(final Configuration conf) { } return (long) (bucketCacheSize * 1024 * 1024); } + + public static long getRowCacheSize(Configuration conf) { + long max = -1L; + final MemoryUsage usage = safeGetHeapMemoryUsage(); + if (usage != null) { + max = usage.getMax(); + } + float globalRowCachePercent = + conf.getFloat(HConstants.ROW_CACHE_SIZE_KEY, HConstants.ROW_CACHE_SIZE_DEFAULT); + return ((long) (max * globalRowCachePercent)); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 9b7daee0f668..6b89f1807e4d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -65,6 +65,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -433,6 +434,11 @@ public MetricsTableRequests getMetricsTableRequests() { */ private long openSeqNum = HConstants.NO_SEQNUM; + /** + * Basically the same as openSeqNum, but it is updated when bulk load is done. + */ + private final AtomicLong rowCacheSeqNum = new AtomicLong(HConstants.NO_SEQNUM); + /** * The default setting for whether to enable on-demand CF loading for scan requests to this * region. Requests can override it. @@ -7881,6 +7887,7 @@ private HRegion openHRegion(final CancelableProgressable reporter) throws IOExce LOG.debug("checking classloading for " + this.getRegionInfo().getEncodedName()); TableDescriptorChecker.checkClassLoading(cConfig, htableDescriptor); this.openSeqNum = initialize(reporter); + this.rowCacheSeqNum.set(this.openSeqNum); this.mvcc.advanceTo(openSeqNum); // The openSeqNum must be increased every time when a region is assigned, as we rely on it to // determine whether a region has been successfully reopened. So here we always write open @@ -8709,6 +8716,17 @@ public long getOpenSeqNum() { return this.openSeqNum; } + public long getRowCacheSeqNum() { + return this.rowCacheSeqNum.get(); + } + + /** + * This is used to invalidate the row cache of the bulk-loaded region. + */ + public void increaseRowCacheSeqNum() { + this.rowCacheSeqNum.incrementAndGet(); + } + @Override public Map getMaxStoreSeqId() { return this.maxSeqIdInStores; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index fdfea375e096..8b10df5cd4f9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -354,6 +354,11 @@ public class RSRpcServices extends HBaseRpcServicesBase public static final String REGIONSERVER_BOOTSTRAP_NODES_SERVICE_CONFIG = "hbase.regionserver.bootstrap.nodes.executorService"; + /** + * The row cache service + */ + private final RowCacheService rowCacheService = new RowCacheService(getConfiguration()); + /** * An Rpc callback for closing a RegionScanner. */ @@ -668,7 +673,7 @@ private CheckAndMutateResult checkAndMutate(HRegion region, List Row.COMPARATOR.compare(v1, v2)); } - OperationStatus[] codes = region.batchMutate(mArray, atomic, nonceGroup, nonce); + OperationStatus[] codes = + rowCacheService.batchMutate(region, mArray, atomic, nonceGroup, nonce); // When atomic is true, it indicates that the mutateRow API or the batch API with // RowMutations is called. In this case, we need to merge the results of the @@ -2336,6 +2342,11 @@ public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller, @Override public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller, final BulkLoadHFileRequest request) throws ServiceException { + return rowCacheService.bulkLoadHFile(this, request); + } + + BulkLoadHFileResponse bulkLoadHFileInternal(final BulkLoadHFileRequest request) + throws ServiceException { long start = EnvironmentEdgeManager.currentTime(); List clusterIds = new ArrayList<>(request.getClusterIdsList()); if (clusterIds.contains(this.server.getClusterId())) { @@ -2592,8 +2603,7 @@ private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCal RegionScannerImpl scanner = null; long blockBytesScannedBefore = context.getBlockBytesScanned(); try { - scanner = region.getScanner(scan); - scanner.next(results); + scanner = rowCacheService.getScanner(region, scan, results); } finally { if (scanner != null) { if (closeCallBack == null) { @@ -3002,33 +3012,10 @@ public MutateResponse mutate(final RpcController rpcc, final MutateRequest reque builder.setMetrics(ProtobufUtil.toQueryMetrics(result.getMetrics())); } } else { - Result r = null; - Boolean processed = null; - MutationType type = mutation.getMutateType(); - switch (type) { - case APPEND: - // TODO: this doesn't actually check anything. - r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement, - context); - break; - case INCREMENT: - // TODO: this doesn't actually check anything. - r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement, - context); - break; - case PUT: - put(region, quota, mutation, cellScanner, spaceQuotaEnforcement); - processed = Boolean.TRUE; - break; - case DELETE: - delete(region, quota, mutation, cellScanner, spaceQuotaEnforcement); - processed = Boolean.TRUE; - break; - default: - throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); - } - if (processed != null) { - builder.setProcessed(processed); + Result r = rowCacheService.mutate(this, region, mutation, quota, cellScanner, nonceGroup, + spaceQuotaEnforcement, context); + if (r == Result.EMPTY_RESULT) { + builder.setProcessed(true); } boolean clientCellBlockSupported = isClientCellBlockSupport(context); addResult(builder, r, controller, clientCellBlockSupported); @@ -3047,6 +3034,29 @@ public MutateResponse mutate(final RpcController rpcc, final MutateRequest reque } } + Result mutateInternal(MutationProto mutation, HRegion region, OperationQuota quota, + CellScanner cellScanner, long nonceGroup, ActivePolicyEnforcement spaceQuotaEnforcement, + RpcCallContext context) throws IOException { + MutationType type = mutation.getMutateType(); + return switch (type) { + case APPEND -> + // TODO: this doesn't actually check anything. + append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement, context); + case INCREMENT -> + // TODO: this doesn't actually check anything. + increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement, + context); + case PUT -> { + put(region, quota, mutation, cellScanner, spaceQuotaEnforcement); + yield Result.EMPTY_RESULT; + } + case DELETE -> { + delete(region, quota, mutation, cellScanner, spaceQuotaEnforcement); + yield Result.EMPTY_RESULT; + } + }; + } + private void put(HRegion region, OperationQuota quota, MutationProto mutation, CellScanner cellScanner, ActivePolicyEnforcement spaceQuota) throws IOException { long before = EnvironmentEdgeManager.currentTime(); @@ -3095,7 +3105,7 @@ private CheckAndMutateResult checkAndMutate(HRegion region, OperationQuota quota result = region.getCoprocessorHost().preCheckAndMutate(checkAndMutate); } if (result == null) { - result = region.checkAndMutate(checkAndMutate, nonceGroup, nonce); + result = rowCacheService.checkAndMutate(region, checkAndMutate, nonceGroup, nonce); if (region.getCoprocessorHost() != null) { result = region.getCoprocessorHost().postCheckAndMutate(checkAndMutate, result); } @@ -4079,4 +4089,9 @@ RegionScannerContext checkQuotaAndGetRegionScannerContext(ScanRequest request, Pair pair = newRegionScanner(request, region, builder); return new RegionScannerContext(pair.getFirst(), pair.getSecond(), quota); } + + // For testing only + public RowCacheService getRowCacheService() { + return rowCacheService; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCache.java new file mode 100644 index 000000000000..a977c7d59cf2 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCache.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +/** + * Interface for caching rows retrieved by Get operations. + */ +@org.apache.yetus.audience.InterfaceAudience.Private +public interface RowCache { + /** + * Cache the specified row. + * @param key the key of the row to cache + * @param value the cells of the row to cache + */ + void cacheRow(RowCacheKey key, RowCells value); + + /** + * Evict the specified row. + * @param key the key of the row to evict + */ + void evictRow(RowCacheKey key); + + /** + * Evict all rows belonging to the specified region. This is heavy operation as it iterates the + * entire RowCache key set. + * @param region the region whose rows should be evicted + */ + void evictRowsByRegion(HRegion region); + + /** + * Get the number of rows in the cache. + * @return the number of rows in the cache + */ + long getCount(); + + /** + * Get the number of rows evicted from the cache. + * @return the number of rows evicted from the cache + */ + long getEvictedRowCount(); + + /** + * Get the hit count. + * @return the hit count + */ + long getHitCount(); + + /** + * Get the maximum size of the cache in bytes. + * @return the maximum size of the cache in bytes + */ + long getMaxSize(); + + /** + * Get the miss count. + * @return the miss count + */ + long getMissCount(); + + /** + * Get the specified row from the cache. + * @param key the key of the row to get + * @param caching whether caching is enabled for this request + * @return the cells of the row, or null if not found or caching is disabled + */ + RowCells getRow(RowCacheKey key, boolean caching); + + /** + * Get the current size of the cache in bytes. + * @return the current size of the cache in bytes + */ + long getSize(); +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCacheImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCacheImpl.java new file mode 100644 index 000000000000..983046281cfc --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCacheImpl.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +@org.apache.yetus.audience.InterfaceAudience.Private +public class RowCacheImpl implements RowCache { + RowCacheImpl(long maxSizeBytes) { + // TODO + } + + @Override + public void cacheRow(RowCacheKey key, RowCells value) { + // TODO + } + + @Override + public void evictRow(RowCacheKey key) { + // TODO + } + + @Override + public void evictRowsByRegion(HRegion region) { + // TODO + } + + @Override + public long getCount() { + // TODO + return 0; + } + + @Override + public long getEvictedRowCount() { + // TODO + return 0; + } + + @Override + public long getHitCount() { + // TODO + return 0; + } + + @Override + public long getMaxSize() { + // TODO + return 0; + } + + @Override + public long getMissCount() { + // TODO + return 0; + } + + @Override + public RowCells getRow(RowCacheKey key, boolean caching) { + // TODO + return null; + } + + @Override + public long getSize() { + // TODO + return 0; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCacheKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCacheKey.java new file mode 100644 index 000000000000..09ec68194ea9 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCacheKey.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.Arrays; +import java.util.Objects; +import org.apache.hadoop.hbase.io.HeapSize; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; + +@org.apache.yetus.audience.InterfaceAudience.Private +public class RowCacheKey implements HeapSize { + public static final long FIXED_OVERHEAD = ClassSize.estimateBase(RowCacheKey.class, false); + + private final String encodedRegionName; + private final byte[] rowKey; + + // When a region is reopened or bulk-loaded, its rowCacheSeqNum is used to generate new keys that + // bypass the existing cache. This mechanism is effective when ROW_CACHE_EVICT_ON_CLOSE is set to + // false. + private final long rowCacheSeqNum; + + public RowCacheKey(HRegion region, byte[] rowKey) { + this.encodedRegionName = region.getRegionInfo().getEncodedName(); + this.rowKey = Objects.requireNonNull(rowKey, "rowKey cannot be null"); + this.rowCacheSeqNum = region.getRowCacheSeqNum(); + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) return false; + RowCacheKey that = (RowCacheKey) o; + return rowCacheSeqNum == that.rowCacheSeqNum + && Objects.equals(encodedRegionName, that.encodedRegionName) + && Objects.deepEquals(rowKey, that.rowKey); + } + + @Override + public int hashCode() { + return Objects.hash(encodedRegionName, Arrays.hashCode(rowKey), rowCacheSeqNum); + } + + @Override + public String toString() { + return encodedRegionName + '_' + Bytes.toStringBinary(rowKey) + '_' + rowCacheSeqNum; + } + + @Override + public long heapSize() { + return FIXED_OVERHEAD + ClassSize.align(rowKey.length); + } + + boolean isSameRegion(HRegion region) { + return this.encodedRegionName.equals(region.getRegionInfo().getEncodedName()); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCacheService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCacheService.java new file mode 100644 index 000000000000..8975779bd599 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCacheService.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.CheckAndMutate; +import org.apache.hadoop.hbase.client.CheckAndMutateResult; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.util.MemorySizeUtil; +import org.apache.hadoop.hbase.ipc.RpcCallContext; +import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement; +import org.apache.hadoop.hbase.quotas.OperationQuota; + +import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse; + +/** + * It is responsible for populating the row cache and retrieving rows from it. + */ +@org.apache.yetus.audience.InterfaceAudience.Private +public class RowCacheService { + private final boolean enabledByConf; + private final RowCache rowCache; + + RowCacheService(Configuration conf) { + enabledByConf = + conf.getFloat(HConstants.ROW_CACHE_SIZE_KEY, HConstants.ROW_CACHE_SIZE_DEFAULT) > 0; + rowCache = new RowCacheImpl(MemorySizeUtil.getRowCacheSize(conf)); + } + + OperationStatus[] batchMutate(HRegion region, Mutation[] mArray, boolean atomic, long nonceGroup, + long nonce) throws IOException { + return region.batchMutate(mArray, atomic, nonceGroup, nonce); + } + + BulkLoadHFileResponse bulkLoadHFile(RSRpcServices rsRpcServices, BulkLoadHFileRequest request) + throws ServiceException { + return rsRpcServices.bulkLoadHFileInternal(request); + } + + CheckAndMutateResult checkAndMutate(HRegion region, CheckAndMutate checkAndMutate, + long nonceGroup, long nonce) throws IOException { + return region.checkAndMutate(checkAndMutate, nonceGroup, nonce); + } + + RegionScannerImpl getScanner(HRegion region, Scan scan, List results) throws IOException { + RegionScannerImpl scanner = region.getScanner(scan); + scanner.next(results); + return scanner; + } + + Result mutate(RSRpcServices rsRpcServices, HRegion region, ClientProtos.MutationProto mutation, + OperationQuota quota, CellScanner cellScanner, long nonceGroup, + ActivePolicyEnforcement spaceQuotaEnforcement, RpcCallContext context) throws IOException { + return rsRpcServices.mutateInternal(mutation, region, quota, cellScanner, nonceGroup, + spaceQuotaEnforcement, context); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCells.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCells.java new file mode 100644 index 000000000000..2f44058e0a24 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowCells.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.ExtendedCell; +import org.apache.hadoop.hbase.io.HeapSize; +import org.apache.hadoop.hbase.util.ClassSize; + +@org.apache.yetus.audience.InterfaceAudience.Private +public class RowCells implements HeapSize { + public static final long FIXED_OVERHEAD = ClassSize.estimateBase(RowCells.class, false); + + private final List cells = new ArrayList<>(); + + public RowCells(List cells) throws CloneNotSupportedException { + for (Cell cell : cells) { + if (!(cell instanceof ExtendedCell extCell)) { + throw new CloneNotSupportedException("Cell is not an ExtendedCell"); + } + try { + // To garbage collect the objects referenced by the cells + this.cells.add(extCell.deepClone()); + } catch (RuntimeException e) { + // throw new CloneNotSupportedException("Deep clone failed"); + this.cells.add(extCell); + } + } + } + + @Override + public long heapSize() { + long cellsSize = cells.stream().mapToLong(Cell::heapSize).sum(); + return FIXED_OVERHEAD + cellsSize; + } + + public List getCells() { + return cells; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/util/TestMemorySizeUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/util/TestMemorySizeUtil.java index 5f00c34dbcb0..3cf1fc33753a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/util/TestMemorySizeUtil.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/util/TestMemorySizeUtil.java @@ -52,6 +52,13 @@ public void testValidateRegionServerHeapMemoryAllocation() { assertEquals(HConstants.HBASE_CLUSTER_MINIMUM_MEMORY_THRESHOLD, 0.2f, 0.0f); MemorySizeUtil.validateRegionServerHeapMemoryAllocation(conf); + // when memstore size + block cache size + row cache size + default free heap min size == 1.0 + conf.setFloat(MemorySizeUtil.MEMSTORE_SIZE_KEY, 0.4f); + conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.39f); + conf.setFloat(HConstants.ROW_CACHE_SIZE_KEY, 0.01f); + assertEquals(HConstants.HBASE_CLUSTER_MINIMUM_MEMORY_THRESHOLD, 0.2f, 0.0f); + MemorySizeUtil.validateRegionServerHeapMemoryAllocation(conf); + // when memstore size + block cache size + default free heap min size > 1.0 conf.setFloat(MemorySizeUtil.MEMSTORE_SIZE_KEY, 0.5f); assertThrows(RuntimeException.class, @@ -60,6 +67,7 @@ public void testValidateRegionServerHeapMemoryAllocation() { // when free heap min size is set to 0, it should not throw an exception conf.setFloat(MemorySizeUtil.MEMSTORE_SIZE_KEY, 0.5f); conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.5f); + conf.setFloat(HConstants.ROW_CACHE_SIZE_KEY, 0.0f); conf.setLong(MemorySizeUtil.HBASE_REGION_SERVER_FREE_HEAP_MIN_MEMORY_SIZE_KEY, 0L); MemorySizeUtil.validateRegionServerHeapMemoryAllocation(conf); @@ -86,4 +94,14 @@ public void testGetRegionServerMinFreeHeapFraction() { minFreeHeapFraction = MemorySizeUtil.getRegionServerMinFreeHeapFraction(conf); assertEquals(0.0f, minFreeHeapFraction, 0.0f); } + + @Test + public void testGetRowCacheSize() { + float rowCacheSizeRatio = 0.01f; + conf.setFloat(HConstants.ROW_CACHE_SIZE_KEY, rowCacheSizeRatio); + long rowCacheSizeBytes = MemorySizeUtil.getRowCacheSize(conf); + + long maxMemory = MemorySizeUtil.safeGetHeapMemoryUsage().getMax(); + assertEquals((long) (maxMemory * rowCacheSizeRatio), rowCacheSizeBytes); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheKey.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheKey.java new file mode 100644 index 000000000000..ee75fd251924 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCacheKey.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotSame; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.testclassification.IOTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +@Category({ IOTests.class, SmallTests.class }) +public class TestRowCacheKey { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRowCacheKey.class); + + private static HRegion region1; + private static HRegion region2; + private static RegionInfo regionInfo1; + + @BeforeClass + public static void beforeClass() { + TableName tableName = TableName.valueOf("table1"); + + regionInfo1 = Mockito.mock(RegionInfo.class); + Mockito.when(regionInfo1.getEncodedName()).thenReturn("region1"); + Mockito.when(regionInfo1.getTable()).thenReturn(tableName); + + region1 = Mockito.mock(HRegion.class); + Mockito.when(region1.getRegionInfo()).thenReturn(regionInfo1); + + RegionInfo regionInfo2 = Mockito.mock(RegionInfo.class); + Mockito.when(regionInfo2.getEncodedName()).thenReturn("region2"); + Mockito.when(regionInfo2.getTable()).thenReturn(tableName); + + region2 = Mockito.mock(HRegion.class); + Mockito.when(region2.getRegionInfo()).thenReturn(regionInfo2); + } + + @Test + public void testEquality() { + RowCacheKey key11 = new RowCacheKey(region1, "row1".getBytes()); + RowCacheKey key12 = new RowCacheKey(region1, "row2".getBytes()); + RowCacheKey key21 = new RowCacheKey(region2, "row1".getBytes()); + RowCacheKey key22 = new RowCacheKey(region2, "row2".getBytes()); + RowCacheKey key11Another = new RowCacheKey(region1, "row1".getBytes()); + assertNotSame(key11, key11Another); + + // Ensure hashCode works well + assertNotEquals(key11.hashCode(), key12.hashCode()); + assertNotEquals(key11.hashCode(), key21.hashCode()); + assertNotEquals(key11.hashCode(), key22.hashCode()); + assertEquals(key11.hashCode(), key11Another.hashCode()); + + // Ensure equals works well + assertNotEquals(key11, key12); + assertNotEquals(key11, key21); + assertNotEquals(key11, key22); + assertEquals(key11, key11Another); + } + + @Test + public void testDifferentRowCacheSeqNum() { + RowCacheKey key1 = new RowCacheKey(region1, "row1".getBytes()); + + HRegion region1Another = Mockito.mock(HRegion.class); + Mockito.when(region1Another.getRegionInfo()).thenReturn(regionInfo1); + Mockito.when(region1Another.getRowCacheSeqNum()).thenReturn(1L); + RowCacheKey key1Another = new RowCacheKey(region1Another, "row1".getBytes()); + + assertNotEquals(key1.hashCode(), key1Another.hashCode()); + assertNotEquals(key1, key1Another); + } + + @Test + public void testHeapSize() { + RowCacheKey key; + long base = RowCacheKey.FIXED_OVERHEAD; + + key = new RowCacheKey(region1, "1".getBytes()); + assertEquals(base + 8, key.heapSize()); + + key = new RowCacheKey(region1, "12345678".getBytes()); + assertEquals(base + 8, key.heapSize()); + + key = new RowCacheKey(region1, "123456789".getBytes()); + assertEquals(base + 8 * 2, key.heapSize()); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCells.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCells.java new file mode 100644 index 000000000000..6307bbce3059 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRowCells.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; + +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueTestUtil; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestRowCells { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRowCells.class); + + @Test + public void testDeepClone() throws CloneNotSupportedException { + List cells = new ArrayList<>(); + KeyValue kv1 = KeyValueTestUtil.create("row", "CF", "q1", 1, "v1"); + cells.add(kv1); + KeyValue kv2 = KeyValueTestUtil.create("row", "CF", "q12", 2, "v22"); + cells.add(kv2); + RowCells rowCells = new RowCells(cells); + + // Ensure deep clone happened + assertNotSame(kv1, rowCells.getCells().get(0)); + assertEquals(kv1, rowCells.getCells().get(0)); + assertNotSame(kv2, rowCells.getCells().get(1)); + assertEquals(kv2, rowCells.getCells().get(1)); + } + + @Test + public void testHeapSize() throws CloneNotSupportedException { + List cells; + RowCells rowCells; + + cells = new ArrayList<>(); + rowCells = new RowCells(cells); + assertEquals(RowCells.FIXED_OVERHEAD, rowCells.heapSize()); + + cells = new ArrayList<>(); + KeyValue kv1 = KeyValueTestUtil.create("row", "CF", "q1", 1, "v1"); + cells.add(kv1); + KeyValue kv2 = KeyValueTestUtil.create("row", "CF", "q22", 2, "v22"); + cells.add(kv2); + rowCells = new RowCells(cells); + assertEquals(RowCells.FIXED_OVERHEAD + kv1.heapSize() + kv2.heapSize(), rowCells.heapSize()); + } +} diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb index 93cc312338c9..f4d14eab7d19 100644 --- a/hbase-shell/src/main/ruby/hbase/admin.rb +++ b/hbase-shell/src/main/ruby/hbase/admin.rb @@ -1610,6 +1610,7 @@ def update_tdb_from_arg(tdb, arg) tdb.setRegionMemStoreReplication(JBoolean.valueOf(arg.delete(TableDescriptorBuilder::REGION_MEMSTORE_REPLICATION))) if arg.include?(TableDescriptorBuilder::REGION_MEMSTORE_REPLICATION) tdb.setRegionSplitPolicyClassName(arg.delete(TableDescriptorBuilder::SPLIT_POLICY)) if arg.include?(TableDescriptorBuilder::SPLIT_POLICY) tdb.setRegionReplication(JInteger.valueOf(arg.delete(TableDescriptorBuilder::REGION_REPLICATION))) if arg.include?(TableDescriptorBuilder::REGION_REPLICATION) + tdb.setRowCacheEnabled(JBoolean.valueOf(arg.delete(TableDescriptorBuilder::ROW_CACHE_ENABLED))) if arg.include?(TableDescriptorBuilder::ROW_CACHE_ENABLED) set_user_metadata(tdb, arg.delete(METADATA)) if arg[METADATA] set_descriptor_config(tdb, arg.delete(CONFIGURATION)) if arg[CONFIGURATION] end