Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions itests/src/test/resources/testconfiguration.properties
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ minillap.query.files=\
intersect_distinct.q,\
intersect_merge.q,\
limit_bailout.q,\
llap_io_cache.q,\
llap_nullscan.q,\
llap_stats.q,\
llap_udf.q,\
Expand Down
19 changes: 8 additions & 11 deletions ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,10 @@ public DiskRangeList createCacheChunk(
int chunkPartCount = largeBufCount + ((smallSize > 0) ? 1 : 0);
DiskRange[] cacheRanges = new DiskRange[chunkPartCount];
int extraOffsetInChunk = 0;
if (maxAlloc < chunkLength) {
newCacheData = new MemoryBuffer[chunkPartCount];
int index = 0;

if (largeBufCount > 0) {
largeBuffers = new MemoryBuffer[largeBufCount];
// Note: we don't use StoppableAllocator here - this is not on an IO thread.
allocator.allocateMultiple(largeBuffers, maxAlloc, cache.getDataBufferFactory());
Expand All @@ -298,8 +301,10 @@ public DiskRangeList createCacheChunk(
extraDiskDataOffset += remaining;
extraOffsetInChunk += remaining;
}
for (MemoryBuffer buf : largeBuffers) {
newCacheData[index++] = buf;
}
}
newCacheData = largeBuffers;
largeBuffers = null;
if (smallSize > 0) {
smallBuffer = new MemoryBuffer[1];
Expand All @@ -311,15 +316,7 @@ public DiskRangeList createCacheChunk(
smallSize, bb, cacheRanges, largeBufCount, chunkFrom + extraOffsetInChunk);
extraDiskDataOffset += smallSize;
extraOffsetInChunk += smallSize; // Not strictly necessary, no one will look at it.
if (newCacheData == null) {
newCacheData = smallBuffer;
} else {
// TODO: add allocate overload with an offset and length
MemoryBuffer[] combinedCacheData = new MemoryBuffer[largeBufCount + 1];
System.arraycopy(newCacheData, 0, combinedCacheData, 0, largeBufCount);
newCacheData = combinedCacheData;
newCacheData[largeBufCount] = smallBuffer[0];
}
newCacheData[index] = smallBuffer[0];
smallBuffer = null;
}
cache.putFileData(fileKey, cacheRanges, newCacheData, 0, tag);
Expand Down
24 changes: 24 additions & 0 deletions ql/src/test/queries/clientpositive/llap_io_cache.q
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
set hive.llap.io.enabled=true;
set hive.llap.io.memory.mode=cache;
set hive.llap.io.allocator.alloc.max=16Mb;
set hive.vectorized.execution.enabled=true;

CREATE TABLE tbl_parq (
id INT,
payload STRING
)
STORED AS PARQUET
TBLPROPERTIES (
'parquet.block.size'='16777216',
'parquet.page.size'='16777216',
'parquet.compression'='UNCOMPRESSED'
);

INSERT OVERWRITE TABLE tbl_parq
SELECT
1 AS id,
RPAD('x', 16777177, 'x') AS payload;

SELECT LENGTH(payload) FROM tbl_parq;

SELECT SUM(LENGTH(payload)) FROM tbl_parq;
60 changes: 60 additions & 0 deletions ql/src/test/results/clientpositive/llap/llap_io_cache.q.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
PREHOOK: query: CREATE TABLE tbl_parq (
id INT,
payload STRING
)
STORED AS PARQUET
TBLPROPERTIES (
'parquet.block.size'='16777216',
'parquet.page.size'='16777216',
'parquet.compression'='UNCOMPRESSED'
)
PREHOOK: type: CREATETABLE
PREHOOK: Output: database:default
PREHOOK: Output: default@tbl_parq
POSTHOOK: query: CREATE TABLE tbl_parq (
id INT,
payload STRING
)
STORED AS PARQUET
TBLPROPERTIES (
'parquet.block.size'='16777216',
'parquet.page.size'='16777216',
'parquet.compression'='UNCOMPRESSED'
)
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: database:default
POSTHOOK: Output: default@tbl_parq
PREHOOK: query: INSERT OVERWRITE TABLE tbl_parq
SELECT
1 AS id,
RPAD('x', 16777177, 'x') AS payload
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@tbl_parq
POSTHOOK: query: INSERT OVERWRITE TABLE tbl_parq
SELECT
1 AS id,
RPAD('x', 16777177, 'x') AS payload
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@tbl_parq
POSTHOOK: Lineage: tbl_parq.id SIMPLE []
POSTHOOK: Lineage: tbl_parq.payload SIMPLE []
PREHOOK: query: SELECT LENGTH(payload) FROM tbl_parq
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_parq
PREHOOK: Output: hdfs://### HDFS PATH ###
POSTHOOK: query: SELECT LENGTH(payload) FROM tbl_parq
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_parq
POSTHOOK: Output: hdfs://### HDFS PATH ###
16777177
PREHOOK: query: SELECT SUM(LENGTH(payload)) FROM tbl_parq
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_parq
PREHOOK: Output: hdfs://### HDFS PATH ###
POSTHOOK: query: SELECT SUM(LENGTH(payload)) FROM tbl_parq
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_parq
POSTHOOK: Output: hdfs://### HDFS PATH ###
16777177