Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions itests/src/test/resources/testconfiguration.properties
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ minillap.query.files=\
intersect_distinct.q,\
intersect_merge.q,\
limit_bailout.q,\
llap_io_cache.q,\
llap_nullscan.q,\
llap_stats.q,\
llap_udf.q,\
Expand Down
19 changes: 8 additions & 11 deletions ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,10 @@ public DiskRangeList createCacheChunk(
int chunkPartCount = largeBufCount + ((smallSize > 0) ? 1 : 0);
DiskRange[] cacheRanges = new DiskRange[chunkPartCount];
int extraOffsetInChunk = 0;
if (maxAlloc < chunkLength) {
newCacheData = new MemoryBuffer[chunkPartCount];
int index = 0;

if (largeBufCount > 0) {
largeBuffers = new MemoryBuffer[largeBufCount];
// Note: we don't use StoppableAllocator here - this is not on an IO thread.
allocator.allocateMultiple(largeBuffers, maxAlloc, cache.getDataBufferFactory());
Expand All @@ -298,8 +301,10 @@ public DiskRangeList createCacheChunk(
extraDiskDataOffset += remaining;
extraOffsetInChunk += remaining;
}
for (MemoryBuffer buf : largeBuffers) {
newCacheData[index++] = buf;
}
}
newCacheData = largeBuffers;
largeBuffers = null;
if (smallSize > 0) {
smallBuffer = new MemoryBuffer[1];
Expand All @@ -311,15 +316,7 @@ public DiskRangeList createCacheChunk(
smallSize, bb, cacheRanges, largeBufCount, chunkFrom + extraOffsetInChunk);
extraDiskDataOffset += smallSize;
extraOffsetInChunk += smallSize; // Not strictly necessary, no one will look at it.
if (newCacheData == null) {
newCacheData = smallBuffer;
} else {
// TODO: add allocate overload with an offset and length
MemoryBuffer[] combinedCacheData = new MemoryBuffer[largeBufCount + 1];
System.arraycopy(newCacheData, 0, combinedCacheData, 0, largeBufCount);
newCacheData = combinedCacheData;
newCacheData[largeBufCount] = smallBuffer[0];
}
newCacheData[index] = smallBuffer[0];
smallBuffer = null;
}
cache.putFileData(fileKey, cacheRanges, newCacheData, 0, tag);
Expand Down
26 changes: 26 additions & 0 deletions ql/src/test/queries/clientpositive/llap_io_cache.q
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
set hive.llap.io.enabled=true;
set hive.llap.io.memory.mode=cache;
set hive.llap.io.allocator.alloc.max=16Mb;
set hive.vectorized.execution.enabled=true;

DROP TABLE IF EXISTS tbl_parq;

CREATE TABLE tbl_parq (
id INT,
payload STRING
)
STORED AS PARQUET
TBLPROPERTIES (
'parquet.block.size'='16777216',
'parquet.page.size'='16777216',
'parquet.compression'='UNCOMPRESSED'
);

INSERT INTO TABLE tbl_parq
SELECT
1 AS id,
RPAD('x', 16777177, 'x') AS payload;

SELECT LENGTH(payload) FROM tbl_parq;

SELECT SUM(LENGTH(payload)) FROM tbl_parq;
66 changes: 66 additions & 0 deletions ql/src/test/results/clientpositive/llap/llap_io_cache.q.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
PREHOOK: query: DROP TABLE IF EXISTS tbl_parq
PREHOOK: type: DROPTABLE
PREHOOK: Output: database:default
POSTHOOK: query: DROP TABLE IF EXISTS tbl_parq
POSTHOOK: type: DROPTABLE
POSTHOOK: Output: database:default
PREHOOK: query: CREATE TABLE tbl_parq (
id INT,
payload STRING
)
STORED AS PARQUET
TBLPROPERTIES (
'parquet.block.size'='16777216',
'parquet.page.size'='16777216',
'parquet.compression'='UNCOMPRESSED'
)
PREHOOK: type: CREATETABLE
PREHOOK: Output: database:default
PREHOOK: Output: default@tbl_parq
POSTHOOK: query: CREATE TABLE tbl_parq (
id INT,
payload STRING
)
STORED AS PARQUET
TBLPROPERTIES (
'parquet.block.size'='16777216',
'parquet.page.size'='16777216',
'parquet.compression'='UNCOMPRESSED'
)
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: database:default
POSTHOOK: Output: default@tbl_parq
PREHOOK: query: INSERT INTO TABLE tbl_parq
SELECT
1 AS id,
RPAD('x', 16777177, 'x') AS payload
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@tbl_parq
POSTHOOK: query: INSERT INTO TABLE tbl_parq
SELECT
1 AS id,
RPAD('x', 16777177, 'x') AS payload
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@tbl_parq
POSTHOOK: Lineage: tbl_parq.id SIMPLE []
POSTHOOK: Lineage: tbl_parq.payload SIMPLE []
PREHOOK: query: SELECT LENGTH(payload) FROM tbl_parq
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_parq
PREHOOK: Output: hdfs://### HDFS PATH ###
POSTHOOK: query: SELECT LENGTH(payload) FROM tbl_parq
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_parq
POSTHOOK: Output: hdfs://### HDFS PATH ###
16777177
PREHOOK: query: SELECT SUM(LENGTH(payload)) FROM tbl_parq
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_parq
PREHOOK: Output: hdfs://### HDFS PATH ###
POSTHOOK: query: SELECT SUM(LENGTH(payload)) FROM tbl_parq
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_parq
POSTHOOK: Output: hdfs://### HDFS PATH ###
16777177