Skip to content

Commit

Permalink
Test different Wave batch sizes (facebookincubator#10458)
Browse files Browse the repository at this point in the history
Summary:
Tests and fixes special cases with partially filled ends of batches with multiple concurrent streams per driver.

Adds an optional guard for ends of wave::Buffers and internal consistency checks for GpuArena. These may catch writes past end.

Pull Request resolved: facebookincubator#10458

Reviewed By: Yuhta

Differential Revision: D59713759

Pulled By: oerling

fbshipit-source-id: 9b17ffc947f54fd2cd9a20b87e6764c6c6b91542
  • Loading branch information
Orri Erling authored and facebook-github-bot committed Jul 16, 2024
1 parent 0adc62e commit fdb0f9b
Show file tree
Hide file tree
Showing 13 changed files with 221 additions and 77 deletions.
8 changes: 6 additions & 2 deletions velox/exec/tests/utils/QueryAssertions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
using facebook::velox::duckdb::duckdbTimestampToVelox;
using facebook::velox::duckdb::veloxTimestampToDuckDB;

DEFINE_int32(max_error_rows, 10, "Max number of listed error rows");

namespace facebook::velox::exec::test {
namespace {

Expand Down Expand Up @@ -676,7 +678,8 @@ std::string makeErrorMessage(
message << extraRows.size() << " extra rows, " << missingRows.size()
<< " missing rows" << std::endl;

auto extraRowsToPrint = std::min((size_t)10, extraRows.size());
auto extraRowsToPrint =
std::min((size_t)FLAGS_max_error_rows, extraRows.size());
message << extraRowsToPrint << " of extra rows:" << std::endl;

for (int32_t i = 0; i < extraRowsToPrint; i++) {
Expand All @@ -686,7 +689,8 @@ std::string makeErrorMessage(
}
message << std::endl;

auto missingRowsToPrint = std::min((size_t)10, missingRows.size());
auto missingRowsToPrint =
std::min((size_t)FLAGS_max_error_rows, missingRows.size());
message << missingRowsToPrint << " of missing rows:" << std::endl;
for (int32_t i = 0; i < missingRowsToPrint; i++) {
message << "\t";
Expand Down
32 changes: 32 additions & 0 deletions velox/experimental/wave/common/Buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,43 @@
*/

#include "velox/experimental/wave/common/Buffer.h"
#include "velox/common/base/Exceptions.h"
#include "velox/common/base/SuccinctPrinter.h"
#include "velox/experimental/wave/common/Exception.h"
#include "velox/experimental/wave/common/GpuArena.h"

DECLARE_bool(wave_buffer_end_guard);

namespace facebook::velox::wave {

void Buffer::check() const {
if (!FLAGS_wave_buffer_end_guard) {
return;
}
if (*magicPtr() != kMagic) {
VELOX_FAIL("Buffer tail overrun: {}", toString());
}
}

void Buffer::setMagic() {
if (!FLAGS_wave_buffer_end_guard) {
return;
}
*magicPtr() = kMagic;
}

std::string Buffer::toString() const {
return fmt::format(
"<Buffer {} capacity={} ref={} pin={} dbg={}>",
ptr_,
capacity_,
referenceCount_,
pinCount_,
debugInfo_);
}

void Buffer::release() {
check();
if (referenceCount_.fetch_sub(1) == 1) {
arena_->free(this);
}
Expand Down
24 changes: 24 additions & 0 deletions velox/experimental/wave/common/Buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <boost/intrusive_ptr.hpp>
#include <atomic>
#include <cstdint>
#include <string>

namespace facebook::velox::wave {

Expand Down Expand Up @@ -75,7 +76,29 @@ class Buffer {

virtual void release();

const std::string& debugInfo() const {
return debugInfo_;
}

void setDebugInfo(std::string info) {
debugInfo_ = std::move(info);
}

/// Checks consistency of magic numbers. Throws on error.
void check() const;

std::string toString() const;

protected:
static constexpr int64_t kMagic = 0xe0be0be0be0be0b;

int64_t* magicPtr() const {
return reinterpret_cast<int64_t*>(
reinterpret_cast<char*>(ptr_) + capacity_);
}

void setMagic();

// Number of WaveBufferPtrs referencing 'this'.
std::atomic<int32_t> referenceCount_{0};

Expand All @@ -97,6 +120,7 @@ class Buffer {
// The containeing arena.
GpuArena* arena_{nullptr};

std::string debugInfo_;
friend class GpuArena;
};

Expand Down
59 changes: 50 additions & 9 deletions velox/experimental/wave/common/GpuArena.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,17 @@
#include "velox/common/base/SuccinctPrinter.h"
#include "velox/experimental/wave/common/Exception.h"

DEFINE_bool(
wave_buffer_end_guard,
false,
"Use buffer end guard to detect overruns");

namespace facebook::velox::wave {

uint64_t GpuSlab::roundBytes(uint64_t bytes) {
if (FLAGS_wave_buffer_end_guard) {
bytes += sizeof(int64_t);
}
return bits::nextPowerOfTwo(std::max<int64_t>(16, bytes));
}

Expand Down Expand Up @@ -287,7 +295,7 @@ GpuArena::GpuArena(uint64_t singleArenaCapacity, GpuAllocator* allocator)
currentArena_ = arena;
}

WaveBufferPtr GpuArena::getBuffer(void* ptr, size_t size) {
WaveBufferPtr GpuArena::getBuffer(void* ptr, size_t capacity, size_t size) {
auto result = firstFreeBuffer_;
if (!result) {
allBuffers_.push_back(std::make_unique<Buffers>());
Expand All @@ -303,39 +311,41 @@ WaveBufferPtr GpuArena::getBuffer(void* ptr, size_t size) {
result->arena_ = this;
result->ptr_ = ptr;
result->size_ = size;
result->capacity_ = size;
result->capacity_ = capacity;
result->setMagic();
return result;
}

WaveBufferPtr GpuArena::allocateBytes(uint64_t bytes) {
bytes = GpuSlab::roundBytes(bytes);
auto roundedBytes = GpuSlab::roundBytes(bytes);
std::lock_guard<std::mutex> l(mutex_);
auto* result = currentArena_->allocate(bytes);
auto* result = currentArena_->allocate(roundedBytes);
if (result != nullptr) {
return getBuffer(result, bytes);
return getBuffer(result, bytes, roundedBytes);
}
for (auto pair : arenas_) {
if (pair.second == currentArena_ || pair.second->freeBytes() < bytes) {
if (pair.second == currentArena_ ||
pair.second->freeBytes() < roundedBytes) {
continue;
}
result = pair.second->allocate(bytes);
if (result) {
currentArena_ = pair.second;
return getBuffer(result, bytes);
return getBuffer(result, bytes, roundedBytes);
}
}

// If first allocation fails we create a new GpuSlab for another attempt. If
// it ever fails again then it means requested bytes is larger than a single
// GpuSlab's capacity. No further attempts will happen.
auto arenaBytes = std::max<uint64_t>(singleArenaCapacity_, bytes);
auto arenaBytes = std::max<uint64_t>(singleArenaCapacity_, roundedBytes);
auto newArena = std::make_shared<GpuSlab>(
allocator_->allocate(arenaBytes), arenaBytes, allocator_);
arenas_.emplace(reinterpret_cast<uint64_t>(newArena->address()), newArena);
currentArena_ = newArena;
result = currentArena_->allocate(bytes);
if (result) {
return getBuffer(result, bytes);
return getBuffer(result, bytes, roundedBytes);
}
VELOX_FAIL("Failed to allocate {} bytes of universal address space", bytes);
}
Expand Down Expand Up @@ -364,4 +374,35 @@ void GpuArena::free(Buffer* buffer) {
firstFreeBuffer_ = buffer;
}

ArenaStatus GpuArena::checkBuffers() {
ArenaStatus status;
std::lock_guard<std::mutex> l(mutex_);
std::vector<Buffer*> usedBuffers;
for (auto& buffers : allBuffers_) {
for (auto& buffer : buffers->buffers) {
if (buffer.referenceCount_) {
++status.numBuffers;
status.capacity += buffer.capacity_;
status.allocatedBytes += buffer.size_;
buffer.check();
usedBuffers.push_back(&buffer);
}
}
}
std::sort(
usedBuffers.begin(),
usedBuffers.end(),
[](Buffer*& left, Buffer*& right) {
return (uintptr_t)left->ptr_ < (uintptr_t)right->ptr_;
});
for (auto i = 0; i < usedBuffers.size() - 1; ++i) {
void* end =
reinterpret_cast<char*>(usedBuffers[i]->ptr_) + usedBuffers[i]->size_;
if (end > usedBuffers[i + 1]->ptr_) {
VELOX_FAIL("Overlapping buffers in GpuArena");
}
}
return status;
}

} // namespace facebook::velox::wave
20 changes: 18 additions & 2 deletions velox/experimental/wave/common/GpuArena.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,17 @@ class GpuSlab {
GpuAllocator* const allocator_;
};

struct ArenaStatus {
/// Number of allocated Buffers.
int32_t numBuffers{0};

/// Sum of capacity of allocated buffers.
int64_t capacity{};

/// Currently used bytes. Larger than capacity because of padding.
int64_t allocatedBytes{0};
};

/// A class that manages a set of GpuSlabs. It is able to adapt itself by
/// growing the number of its managed GpuSlab's when extreme memory
/// fragmentation happens.
Expand Down Expand Up @@ -141,6 +152,10 @@ class GpuArena {
return arenas_;
}

/// Checks magic numbers and returns the sum of allocated capacity. Actual
/// sizes are padded to larger.
ArenaStatus checkBuffers();

private:
// A preallocated array of Buffer handles for memory of 'this'.
struct Buffers {
Expand All @@ -149,8 +164,9 @@ class GpuArena {
};

// Returns a new reference counting pointer to a new Buffer initialized to
// 'ptr' and 'size'.
WaveBufferPtr getBuffer(void* ptr, size_t size);
// 'ptr' and 'size'. 'size' is the size to fre, 'capacity' is the usable size
// excluding magic numbers and padding.
WaveBufferPtr getBuffer(void* ptr, size_t capacity, size_t size);

// Serializes all activity in 'this'.
std::mutex mutex_;
Expand Down
5 changes: 3 additions & 2 deletions velox/experimental/wave/dwio/FormatData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ std::unique_ptr<GpuDecode> FormatData::makeStep(
WaveTypeKind columnKind,
int32_t blockIdx) {
auto rowsPerBlock = FLAGS_wave_reader_rows_per_tb;
auto maxRowsPerThread = (rowsPerBlock / kBlockSize);
int32_t numBlocks =
bits::roundUp(op.rows.size(), rowsPerBlock) / rowsPerBlock;

Expand All @@ -116,7 +117,7 @@ std::unique_ptr<GpuDecode> FormatData::makeStep(
step->nonNullBases = grid_.numNonNull;
step->nulls = grid_.nulls;
}
step->numRowsPerThread = rowsPerBlock / kBlockSize;
step->numRowsPerThread = bits::roundUp(rowsInBlock, kBlockSize) / kBlockSize;
setFilter(step.get(), op.reader, nullptr);
bool dense = previousFilter == nullptr &&
simd::isDense(op.rows.data(), op.rows.size());
Expand All @@ -136,7 +137,7 @@ std::unique_ptr<GpuDecode> FormatData::makeStep(
op.extraRowCountId, &op.extraRowCount, true);
} else {
step->filterRowCount = reinterpret_cast<int32_t*>(
blockIdx * sizeof(int32_t) * step->numRowsPerThread);
blockIdx * sizeof(int32_t) * maxRowsPerThread);
deviceStaging.registerPointer(
op.extraRowCountId, &step->filterRowCount, false);
}
Expand Down
15 changes: 9 additions & 6 deletions velox/experimental/wave/dwio/ReadStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ ReadStream::ReadStream(

void ReadStream::setBlockStatusAndTemp() {
auto* status = control_->deviceData->as<BlockStatus>();
auto maxRowsPerThread = FLAGS_wave_reader_rows_per_tb / kBlockSize;
auto tempSize = programs_.programs[0][0]->tempSize();
auto size = programs_.programs.size() * tempSize;
auto id = deviceStaging_.reserve(size);
Expand All @@ -69,7 +70,7 @@ void ReadStream::setBlockStatusAndTemp() {
for (auto& op : program) {
op->temp = reinterpret_cast<int32_t*>(blockIdx * tempSize);
deviceStaging_.registerPointer(id, &op->temp, false);
op->blockStatus = status + op->numRowsPerThread * op->nthBlock;
op->blockStatus = status + maxRowsPerThread * op->nthBlock;
}
}
}
Expand Down Expand Up @@ -119,21 +120,23 @@ void ReadStream::makeGrid(Stream* stream) {

void ReadStream::makeCompact(bool isSerial) {
auto rowsPerBlock = FLAGS_wave_reader_rows_per_tb;
auto numRowsPerThread = FLAGS_wave_reader_rows_per_tb / kBlockSize;
auto maxRowsPerThread = FLAGS_wave_reader_rows_per_tb / kBlockSize;
for (int32_t i = 0; i < static_cast<int32_t>(filters_.size()) - 1; ++i) {
if (filters_[i].waveVector) {
int32_t numTBs =
bits::roundUp(numBlocks_, numRowsPerThread) / numRowsPerThread;
bits::roundUp(numBlocks_, maxRowsPerThread) / maxRowsPerThread;
for (auto blockIdx = 0; blockIdx < numTBs; ++blockIdx) {
auto step = std::make_unique<GpuDecode>();
step->step = DecodeStep::kCompact64;
step->nthBlock = blockIdx;
step->numRowsPerThread = numRowsPerThread;
step->numRowsPerThread = blockIdx == numTBs - 1
? numBlocks_ - (numTBs - 1) * maxRowsPerThread
: maxRowsPerThread;
if (filters_.back().deviceResult) {
step->data.compact.finalRows =
filters_.back().deviceResult + blockIdx * rowsPerBlock;
step->data.compact.sourceNumRows =
filters_[i].extraRowCount + blockIdx * numRowsPerThread;
filters_[i].extraRowCount + blockIdx * maxRowsPerThread;
} else {
step->data.compact.finalRows = reinterpret_cast<int32_t*>(
blockIdx * rowsPerBlock * sizeof(int32_t));
Expand All @@ -142,7 +145,7 @@ void ReadStream::makeCompact(bool isSerial) {
&step->data.compact.finalRows,
false);
step->data.compact.sourceNumRows = reinterpret_cast<int32_t*>(
blockIdx * numRowsPerThread * sizeof(int32_t));
blockIdx * maxRowsPerThread * sizeof(int32_t));
deviceStaging_.registerPointer(
filters_[i].extraRowCountId,
&step->data.compact.sourceNumRows,
Expand Down
Loading

0 comments on commit fdb0f9b

Please sign in to comment.