From fdb0f9b5526880960f7ee329b3c9866cfc50fb36 Mon Sep 17 00:00:00 2001 From: Orri Erling Date: Tue, 16 Jul 2024 08:28:18 -0700 Subject: [PATCH] Test different Wave batch sizes (#10458) Summary: Tests and fixes special cases with partially filled ends of batches with multiple concurrent streams per driver. Adds an optional guard for ends of wave::Buffers and internal consistency checks for GpuArena. These may catch writes past end. Pull Request resolved: https://github.com/facebookincubator/velox/pull/10458 Reviewed By: Yuhta Differential Revision: D59713759 Pulled By: oerling fbshipit-source-id: 9b17ffc947f54fd2cd9a20b87e6764c6c6b91542 --- velox/exec/tests/utils/QueryAssertions.cpp | 8 ++- velox/experimental/wave/common/Buffer.cpp | 32 ++++++++++ velox/experimental/wave/common/Buffer.h | 24 ++++++++ velox/experimental/wave/common/GpuArena.cpp | 59 ++++++++++++++++--- velox/experimental/wave/common/GpuArena.h | 20 ++++++- velox/experimental/wave/dwio/FormatData.cpp | 5 +- velox/experimental/wave/dwio/ReadStream.cpp | 15 +++-- velox/experimental/wave/exec/ExprKernel.cu | 44 +------------- velox/experimental/wave/exec/ExprKernel3.cu | 12 ++++ velox/experimental/wave/exec/Wave.cpp | 2 +- velox/experimental/wave/exec/WaveCore.cuh | 27 +++++++++ .../wave/exec/tests/TableScanTest.cpp | 33 ++++++++--- velox/experimental/wave/vector/WaveVector.cpp | 17 ++++-- 13 files changed, 221 insertions(+), 77 deletions(-) diff --git a/velox/exec/tests/utils/QueryAssertions.cpp b/velox/exec/tests/utils/QueryAssertions.cpp index 60519397ebf8..437b5a685927 100644 --- a/velox/exec/tests/utils/QueryAssertions.cpp +++ b/velox/exec/tests/utils/QueryAssertions.cpp @@ -27,6 +27,8 @@ using facebook::velox::duckdb::duckdbTimestampToVelox; using facebook::velox::duckdb::veloxTimestampToDuckDB; +DEFINE_int32(max_error_rows, 10, "Max number of listed error rows"); + namespace facebook::velox::exec::test { namespace { @@ -676,7 +678,8 @@ std::string makeErrorMessage( message << extraRows.size() << " extra rows, " << missingRows.size() << " missing rows" << std::endl; - auto extraRowsToPrint = std::min((size_t)10, extraRows.size()); + auto extraRowsToPrint = + std::min((size_t)FLAGS_max_error_rows, extraRows.size()); message << extraRowsToPrint << " of extra rows:" << std::endl; for (int32_t i = 0; i < extraRowsToPrint; i++) { @@ -686,7 +689,8 @@ std::string makeErrorMessage( } message << std::endl; - auto missingRowsToPrint = std::min((size_t)10, missingRows.size()); + auto missingRowsToPrint = + std::min((size_t)FLAGS_max_error_rows, missingRows.size()); message << missingRowsToPrint << " of missing rows:" << std::endl; for (int32_t i = 0; i < missingRowsToPrint; i++) { message << "\t"; diff --git a/velox/experimental/wave/common/Buffer.cpp b/velox/experimental/wave/common/Buffer.cpp index 53552a04d153..3af8d128c039 100644 --- a/velox/experimental/wave/common/Buffer.cpp +++ b/velox/experimental/wave/common/Buffer.cpp @@ -15,11 +15,43 @@ */ #include "velox/experimental/wave/common/Buffer.h" +#include "velox/common/base/Exceptions.h" +#include "velox/common/base/SuccinctPrinter.h" +#include "velox/experimental/wave/common/Exception.h" #include "velox/experimental/wave/common/GpuArena.h" +DECLARE_bool(wave_buffer_end_guard); + namespace facebook::velox::wave { +void Buffer::check() const { + if (!FLAGS_wave_buffer_end_guard) { + return; + } + if (*magicPtr() != kMagic) { + VELOX_FAIL("Buffer tail overrun: {}", toString()); + } +} + +void Buffer::setMagic() { + if (!FLAGS_wave_buffer_end_guard) { + return; + } + *magicPtr() = kMagic; +} + +std::string Buffer::toString() const { + return fmt::format( + "", + ptr_, + capacity_, + referenceCount_, + pinCount_, + debugInfo_); +} + void Buffer::release() { + check(); if (referenceCount_.fetch_sub(1) == 1) { arena_->free(this); } diff --git a/velox/experimental/wave/common/Buffer.h b/velox/experimental/wave/common/Buffer.h index a205e173ffcc..0f0bddabac7b 100644 --- a/velox/experimental/wave/common/Buffer.h +++ b/velox/experimental/wave/common/Buffer.h @@ -18,6 +18,7 @@ #include #include #include +#include namespace facebook::velox::wave { @@ -75,7 +76,29 @@ class Buffer { virtual void release(); + const std::string& debugInfo() const { + return debugInfo_; + } + + void setDebugInfo(std::string info) { + debugInfo_ = std::move(info); + } + + /// Checks consistency of magic numbers. Throws on error. + void check() const; + + std::string toString() const; + protected: + static constexpr int64_t kMagic = 0xe0be0be0be0be0b; + + int64_t* magicPtr() const { + return reinterpret_cast( + reinterpret_cast(ptr_) + capacity_); + } + + void setMagic(); + // Number of WaveBufferPtrs referencing 'this'. std::atomic referenceCount_{0}; @@ -97,6 +120,7 @@ class Buffer { // The containeing arena. GpuArena* arena_{nullptr}; + std::string debugInfo_; friend class GpuArena; }; diff --git a/velox/experimental/wave/common/GpuArena.cpp b/velox/experimental/wave/common/GpuArena.cpp index bedd472fd5d9..5605b6428617 100644 --- a/velox/experimental/wave/common/GpuArena.cpp +++ b/velox/experimental/wave/common/GpuArena.cpp @@ -21,9 +21,17 @@ #include "velox/common/base/SuccinctPrinter.h" #include "velox/experimental/wave/common/Exception.h" +DEFINE_bool( + wave_buffer_end_guard, + false, + "Use buffer end guard to detect overruns"); + namespace facebook::velox::wave { uint64_t GpuSlab::roundBytes(uint64_t bytes) { + if (FLAGS_wave_buffer_end_guard) { + bytes += sizeof(int64_t); + } return bits::nextPowerOfTwo(std::max(16, bytes)); } @@ -287,7 +295,7 @@ GpuArena::GpuArena(uint64_t singleArenaCapacity, GpuAllocator* allocator) currentArena_ = arena; } -WaveBufferPtr GpuArena::getBuffer(void* ptr, size_t size) { +WaveBufferPtr GpuArena::getBuffer(void* ptr, size_t capacity, size_t size) { auto result = firstFreeBuffer_; if (!result) { allBuffers_.push_back(std::make_unique()); @@ -303,39 +311,41 @@ WaveBufferPtr GpuArena::getBuffer(void* ptr, size_t size) { result->arena_ = this; result->ptr_ = ptr; result->size_ = size; - result->capacity_ = size; + result->capacity_ = capacity; + result->setMagic(); return result; } WaveBufferPtr GpuArena::allocateBytes(uint64_t bytes) { - bytes = GpuSlab::roundBytes(bytes); + auto roundedBytes = GpuSlab::roundBytes(bytes); std::lock_guard l(mutex_); - auto* result = currentArena_->allocate(bytes); + auto* result = currentArena_->allocate(roundedBytes); if (result != nullptr) { - return getBuffer(result, bytes); + return getBuffer(result, bytes, roundedBytes); } for (auto pair : arenas_) { - if (pair.second == currentArena_ || pair.second->freeBytes() < bytes) { + if (pair.second == currentArena_ || + pair.second->freeBytes() < roundedBytes) { continue; } result = pair.second->allocate(bytes); if (result) { currentArena_ = pair.second; - return getBuffer(result, bytes); + return getBuffer(result, bytes, roundedBytes); } } // If first allocation fails we create a new GpuSlab for another attempt. If // it ever fails again then it means requested bytes is larger than a single // GpuSlab's capacity. No further attempts will happen. - auto arenaBytes = std::max(singleArenaCapacity_, bytes); + auto arenaBytes = std::max(singleArenaCapacity_, roundedBytes); auto newArena = std::make_shared( allocator_->allocate(arenaBytes), arenaBytes, allocator_); arenas_.emplace(reinterpret_cast(newArena->address()), newArena); currentArena_ = newArena; result = currentArena_->allocate(bytes); if (result) { - return getBuffer(result, bytes); + return getBuffer(result, bytes, roundedBytes); } VELOX_FAIL("Failed to allocate {} bytes of universal address space", bytes); } @@ -364,4 +374,35 @@ void GpuArena::free(Buffer* buffer) { firstFreeBuffer_ = buffer; } +ArenaStatus GpuArena::checkBuffers() { + ArenaStatus status; + std::lock_guard l(mutex_); + std::vector usedBuffers; + for (auto& buffers : allBuffers_) { + for (auto& buffer : buffers->buffers) { + if (buffer.referenceCount_) { + ++status.numBuffers; + status.capacity += buffer.capacity_; + status.allocatedBytes += buffer.size_; + buffer.check(); + usedBuffers.push_back(&buffer); + } + } + } + std::sort( + usedBuffers.begin(), + usedBuffers.end(), + [](Buffer*& left, Buffer*& right) { + return (uintptr_t)left->ptr_ < (uintptr_t)right->ptr_; + }); + for (auto i = 0; i < usedBuffers.size() - 1; ++i) { + void* end = + reinterpret_cast(usedBuffers[i]->ptr_) + usedBuffers[i]->size_; + if (end > usedBuffers[i + 1]->ptr_) { + VELOX_FAIL("Overlapping buffers in GpuArena"); + } + } + return status; +} + } // namespace facebook::velox::wave diff --git a/velox/experimental/wave/common/GpuArena.h b/velox/experimental/wave/common/GpuArena.h index 393899d9338c..1b03ce84a7ac 100644 --- a/velox/experimental/wave/common/GpuArena.h +++ b/velox/experimental/wave/common/GpuArena.h @@ -114,6 +114,17 @@ class GpuSlab { GpuAllocator* const allocator_; }; +struct ArenaStatus { + /// Number of allocated Buffers. + int32_t numBuffers{0}; + + /// Sum of capacity of allocated buffers. + int64_t capacity{}; + + /// Currently used bytes. Larger than capacity because of padding. + int64_t allocatedBytes{0}; +}; + /// A class that manages a set of GpuSlabs. It is able to adapt itself by /// growing the number of its managed GpuSlab's when extreme memory /// fragmentation happens. @@ -141,6 +152,10 @@ class GpuArena { return arenas_; } + /// Checks magic numbers and returns the sum of allocated capacity. Actual + /// sizes are padded to larger. + ArenaStatus checkBuffers(); + private: // A preallocated array of Buffer handles for memory of 'this'. struct Buffers { @@ -149,8 +164,9 @@ class GpuArena { }; // Returns a new reference counting pointer to a new Buffer initialized to - // 'ptr' and 'size'. - WaveBufferPtr getBuffer(void* ptr, size_t size); + // 'ptr' and 'size'. 'size' is the size to fre, 'capacity' is the usable size + // excluding magic numbers and padding. + WaveBufferPtr getBuffer(void* ptr, size_t capacity, size_t size); // Serializes all activity in 'this'. std::mutex mutex_; diff --git a/velox/experimental/wave/dwio/FormatData.cpp b/velox/experimental/wave/dwio/FormatData.cpp index 925a8fddd7aa..aa588ed5b2d3 100644 --- a/velox/experimental/wave/dwio/FormatData.cpp +++ b/velox/experimental/wave/dwio/FormatData.cpp @@ -105,6 +105,7 @@ std::unique_ptr FormatData::makeStep( WaveTypeKind columnKind, int32_t blockIdx) { auto rowsPerBlock = FLAGS_wave_reader_rows_per_tb; + auto maxRowsPerThread = (rowsPerBlock / kBlockSize); int32_t numBlocks = bits::roundUp(op.rows.size(), rowsPerBlock) / rowsPerBlock; @@ -116,7 +117,7 @@ std::unique_ptr FormatData::makeStep( step->nonNullBases = grid_.numNonNull; step->nulls = grid_.nulls; } - step->numRowsPerThread = rowsPerBlock / kBlockSize; + step->numRowsPerThread = bits::roundUp(rowsInBlock, kBlockSize) / kBlockSize; setFilter(step.get(), op.reader, nullptr); bool dense = previousFilter == nullptr && simd::isDense(op.rows.data(), op.rows.size()); @@ -136,7 +137,7 @@ std::unique_ptr FormatData::makeStep( op.extraRowCountId, &op.extraRowCount, true); } else { step->filterRowCount = reinterpret_cast( - blockIdx * sizeof(int32_t) * step->numRowsPerThread); + blockIdx * sizeof(int32_t) * maxRowsPerThread); deviceStaging.registerPointer( op.extraRowCountId, &step->filterRowCount, false); } diff --git a/velox/experimental/wave/dwio/ReadStream.cpp b/velox/experimental/wave/dwio/ReadStream.cpp index 85b7c9a008e2..e961f1929d6a 100644 --- a/velox/experimental/wave/dwio/ReadStream.cpp +++ b/velox/experimental/wave/dwio/ReadStream.cpp @@ -61,6 +61,7 @@ ReadStream::ReadStream( void ReadStream::setBlockStatusAndTemp() { auto* status = control_->deviceData->as(); + auto maxRowsPerThread = FLAGS_wave_reader_rows_per_tb / kBlockSize; auto tempSize = programs_.programs[0][0]->tempSize(); auto size = programs_.programs.size() * tempSize; auto id = deviceStaging_.reserve(size); @@ -69,7 +70,7 @@ void ReadStream::setBlockStatusAndTemp() { for (auto& op : program) { op->temp = reinterpret_cast(blockIdx * tempSize); deviceStaging_.registerPointer(id, &op->temp, false); - op->blockStatus = status + op->numRowsPerThread * op->nthBlock; + op->blockStatus = status + maxRowsPerThread * op->nthBlock; } } } @@ -119,21 +120,23 @@ void ReadStream::makeGrid(Stream* stream) { void ReadStream::makeCompact(bool isSerial) { auto rowsPerBlock = FLAGS_wave_reader_rows_per_tb; - auto numRowsPerThread = FLAGS_wave_reader_rows_per_tb / kBlockSize; + auto maxRowsPerThread = FLAGS_wave_reader_rows_per_tb / kBlockSize; for (int32_t i = 0; i < static_cast(filters_.size()) - 1; ++i) { if (filters_[i].waveVector) { int32_t numTBs = - bits::roundUp(numBlocks_, numRowsPerThread) / numRowsPerThread; + bits::roundUp(numBlocks_, maxRowsPerThread) / maxRowsPerThread; for (auto blockIdx = 0; blockIdx < numTBs; ++blockIdx) { auto step = std::make_unique(); step->step = DecodeStep::kCompact64; step->nthBlock = blockIdx; - step->numRowsPerThread = numRowsPerThread; + step->numRowsPerThread = blockIdx == numTBs - 1 + ? numBlocks_ - (numTBs - 1) * maxRowsPerThread + : maxRowsPerThread; if (filters_.back().deviceResult) { step->data.compact.finalRows = filters_.back().deviceResult + blockIdx * rowsPerBlock; step->data.compact.sourceNumRows = - filters_[i].extraRowCount + blockIdx * numRowsPerThread; + filters_[i].extraRowCount + blockIdx * maxRowsPerThread; } else { step->data.compact.finalRows = reinterpret_cast( blockIdx * rowsPerBlock * sizeof(int32_t)); @@ -142,7 +145,7 @@ void ReadStream::makeCompact(bool isSerial) { &step->data.compact.finalRows, false); step->data.compact.sourceNumRows = reinterpret_cast( - blockIdx * numRowsPerThread * sizeof(int32_t)); + blockIdx * maxRowsPerThread * sizeof(int32_t)); deviceStaging_.registerPointer( filters_[i].extraRowCountId, &step->data.compact.sourceNumRows, diff --git a/velox/experimental/wave/exec/ExprKernel.cu b/velox/experimental/wave/exec/ExprKernel.cu index 799339a325b4..c97bc07a3d38 100644 --- a/velox/experimental/wave/exec/ExprKernel.cu +++ b/velox/experimental/wave/exec/ExprKernel.cu @@ -26,33 +26,6 @@ DEFINE_bool(kernel_gdb, false, "Run kernels sequentially for debugging"); namespace facebook::velox::wave { -template -__device__ inline T opFunc_kPlus(T left, T right) { - return left + right; -} - -template -__device__ __forceinline__ void binaryOpKernel( - OpFunc func, - IBinary& instr, - Operand** operands, - int32_t blockBase, - void* shared, - ErrorCode& laneStatus) { - if (!laneActive(laneStatus)) { - return; - } - T left; - T right; - if (operandOrNull(operands, instr.left, blockBase, shared, left) && - operandOrNull(operands, instr.right, blockBase, shared, right)) { - flatResult( - operands, instr.result, blockBase, shared) = func(left, right); - } else { - resultNull(operands, instr.result, blockBase, shared); - } -} - #define BINARY_TYPES(opCode, TP, OP) \ case opCode: \ binaryOpKernel( \ @@ -72,19 +45,7 @@ __global__ void oneAggregate(KernelParams params, int32_t pc, int32_t base) { __global__ void oneReadAggregate(KernelParams params, int32_t pc, int32_t base); -template -__global__ void onePlus(KernelParams params, int32_t pc, int32_t base) { - PROGRAM_PREAMBLE(base); - binaryOpKernel( - [](auto left, auto right) { return left + right; }, - instruction[pc]._.binary, - operands, - blockBase, - &shared->data, - laneStatus); - PROGRAM_EPILOGUE(); -} - +__global__ void onePlusBigint(KernelParams params, int32_t pc, int32_t base); template __global__ void oneLt(KernelParams params, int32_t pc, int32_t base) { PROGRAM_PREAMBLE(base); @@ -181,6 +142,7 @@ void WaveKernelStream::callOne( start = params.startPC[programIdx]; } for (auto pc = start; pc < program.size(); ++pc) { + assert(params.programs[0]->instructions != nullptr); switch (program[pc]) { case OpCode::kFilter: CALL_ONE(oneFilter, params, pc, base) @@ -193,7 +155,7 @@ void WaveKernelStream::callOne( CALL_ONE(oneReadAggregate, params, pc, base) break; case OpCode::kPlus_BIGINT: - CALL_ONE(onePlus, params, pc, base); + CALL_ONE(onePlusBigint, params, pc, base); break; case OpCode::kLT_BIGINT: CALL_ONE(oneLt, params, pc, base); diff --git a/velox/experimental/wave/exec/ExprKernel3.cu b/velox/experimental/wave/exec/ExprKernel3.cu index ed7626f689ef..56ad179c0b8c 100644 --- a/velox/experimental/wave/exec/ExprKernel3.cu +++ b/velox/experimental/wave/exec/ExprKernel3.cu @@ -39,4 +39,16 @@ __global__ void oneFilter(KernelParams params, int32_t pc, int32_t base) { PROGRAM_EPILOGUE(); } +__global__ void onePlusBigint(KernelParams params, int32_t pc, int32_t base) { + PROGRAM_PREAMBLE(base); + binaryOpKernel( + [](auto left, auto right) { return left + right; }, + instruction[pc]._.binary, + operands, + blockBase, + &shared->data, + laneStatus); + PROGRAM_EPILOGUE(); +} + } // namespace facebook::velox::wave diff --git a/velox/experimental/wave/exec/Wave.cpp b/velox/experimental/wave/exec/Wave.cpp index 633b5be5201b..a7f02f43ecc1 100644 --- a/velox/experimental/wave/exec/Wave.cpp +++ b/velox/experimental/wave/exec/Wave.cpp @@ -796,7 +796,7 @@ int32_t WaveStream::getOutput( VELOX_CHECK(it != launchControl_.end()); auto* control = it->second[0].get(); auto* status = control->params.status; - auto numBlocks = bits::roundUp(control->inputRows, kBlockSize) / kBlockSize; + auto numBlocks = bits::roundUp(numRows_, kBlockSize) / kBlockSize; if (operands.empty()) { return statusNumRows(status, numBlocks); } diff --git a/velox/experimental/wave/exec/WaveCore.cuh b/velox/experimental/wave/exec/WaveCore.cuh index 24874b1c9156..e1630c42e769 100644 --- a/velox/experimental/wave/exec/WaveCore.cuh +++ b/velox/experimental/wave/exec/WaveCore.cuh @@ -273,4 +273,31 @@ __device__ void __forceinline__ wrapKernel( __syncthreads(); } +template +__device__ inline T opFunc_kPlus(T left, T right) { + return left + right; +} + +template +__device__ __forceinline__ void binaryOpKernel( + OpFunc func, + IBinary& instr, + Operand** operands, + int32_t blockBase, + void* shared, + ErrorCode& laneStatus) { + if (!laneActive(laneStatus)) { + return; + } + T left; + T right; + if (operandOrNull(operands, instr.left, blockBase, shared, left) && + operandOrNull(operands, instr.right, blockBase, shared, right)) { + flatResult( + operands, instr.result, blockBase, shared) = func(left, right); + } else { + resultNull(operands, instr.result, blockBase, shared); + } +} + } // namespace facebook::velox::wave diff --git a/velox/experimental/wave/exec/tests/TableScanTest.cpp b/velox/experimental/wave/exec/tests/TableScanTest.cpp index d26000db2db2..a75a66bb64c6 100644 --- a/velox/experimental/wave/exec/tests/TableScanTest.cpp +++ b/velox/experimental/wave/exec/tests/TableScanTest.cpp @@ -43,11 +43,10 @@ struct WaveScanTestParam { std::vector waveScanTestParams() { return { - WaveScanTestParam{}, WaveScanTestParam{.numStreams = 4}, - // *** Not all size combinations work, e.eg. : - // WaveScanTestParam{.numStreams = 4, .batchSize = 1111}, - // WaveScanTestParam{ .numStreams = 9, .batchSize = 16500} - }; + WaveScanTestParam{}, + WaveScanTestParam{.numStreams = 4}, + WaveScanTestParam{.numStreams = 4, .batchSize = 1111}, + WaveScanTestParam{.numStreams = 9, .batchSize = 16500}}; } class TableScanTest : public virtual HiveConnectorTestBase, @@ -74,6 +73,7 @@ class TableScanTest : public virtual HiveConnectorTestBase, } void TearDown() override { + vectors_.clear(); wave::test::Table::dropAll(); HiveConnectorTestBase::TearDown(); } @@ -83,17 +83,20 @@ class TableScanTest : public virtual HiveConnectorTestBase, int32_t numVectors, int32_t vectorSize, bool notNull = true) { - auto vectors = makeVectors(type, numVectors, vectorSize); + vectors_ = makeVectors(type, numVectors, vectorSize); int32_t cnt = 0; - for (auto& vector : vectors) { + for (auto& vector : vectors_) { makeRange(vector, 1000000000, notNull); auto rn = vector->childAt(type->size() - 1)->as>(); for (auto i = 0; i < rn->size(); ++i) { rn->set(i, cnt++); } } - auto splits = makeTable("test", vectors); - createDuckDbTable(vectors); + auto splits = makeTable("test", vectors_); + createDuckDbTable(vectors_); + if (dumpData_) { + toFile(); + } return splits; } @@ -196,10 +199,22 @@ class TableScanTest : public virtual HiveConnectorTestBase, ASSERT_EQ(n, task->numFinishedDrivers()); } + FOLLY_NOINLINE void toFile() { + std::ofstream out("/tmp/file.txt"); + int32_t row = 0; + for (auto i = 0; i < vectors_.size(); ++i) { + out << "\n\n*** " << row; + out << vectors_[i]->toString(0, vectors_[i]->size(), "\n", true); + } + out.close(); + } + VectorFuzzer::Options options_; std::unique_ptr fuzzer_; int32_t numBatches_ = 3; int32_t batchSize_ = 20'000; + std::vector vectors_; + bool dumpData_{false}; }; TEST_P(TableScanTest, basic) { diff --git a/velox/experimental/wave/vector/WaveVector.cpp b/velox/experimental/wave/vector/WaveVector.cpp index 486020bb10d0..f9db77949df8 100644 --- a/velox/experimental/wave/vector/WaveVector.cpp +++ b/velox/experimental/wave/vector/WaveVector.cpp @@ -202,26 +202,33 @@ VectorPtr WaveVector::toVelox( int32_t numBlocks, const BlockStatus* status, const Operand* operand) { + auto operandIndices = operand->indices; + // If there is a wrap, any row can be referenced, so the vector is + // with size_ of WaveVector. If there is no wrap, it is mapped to + // the end of the last BlockStatus. If the blocks are densely filled + // we have the vector at right size with no wrap on top. + int32_t filledSize = operandIndices + ? size_ + : (kBlockSize * (numBlocks - 1)) + status[numBlocks - 1].numRows; auto base = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL( - toVeloxTyped, type_->kind(), size_, pool, type_, values_, nulls_); + toVeloxTyped, type_->kind(), filledSize, pool, type_, values_, nulls_); if (!status || !operand) { return base; } // Translate the BlockStatus and indices in Operand to a host side dictionary // wrap. - int maxRow = std::min(size_, numBlocks * kBlockSize); + int maxRow = std::min(filledSize, numBlocks * kBlockSize); numBlocks = bits::roundUp(maxRow, kBlockSize) / kBlockSize; int numActive = statusNumRows(status, numBlocks); - auto operandIndices = operand->indices; if (!operandIndices) { // Vector sizes are >= active in status because they are allocated before // the row count in status becomes known. VELOX_CHECK_LE( numActive, size_, - "If there is no indirection in Operand, vector size must be <= BlockStatus"); - // If all blocks except last are filled we return base without wrap. + "If there is no indirection in Operand, vector size must be >= BlockStatus"); + // If all blocks except last are full we return base without wrap. if (isDenselyFilled(status, numBlocks)) { return base; }