Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 51 additions & 12 deletions velox/experimental/cudf/exec/Utilities.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,42 @@
#include <cudf/detail/utilities/stream_pool.hpp>
#include <cudf/utilities/memory_resource.hpp>

#include <cuda_runtime_api.h>

#include <limits>
#include <vector>

namespace facebook::velox::cudf_velox {
namespace {

int getNumCudaDevices() {
int numDevices{};
CUDF_CUDA_TRY(cudaGetDeviceCount(&numDevices));
return numDevices;
}

int getCurrentCudaDevice() {
int device{};
CUDF_CUDA_TRY(cudaGetDevice(&device));
return device;
}

CudaEvent& eventForThread() {
Comment thread
kjmph marked this conversation as resolved.
// Intentionally leak per-thread, per-device events to avoid CUDA calls from
// thread-local destructors after CUDA context teardown.
thread_local static std::vector<CudaEvent*> events(getNumCudaDevices());
auto const device = getCurrentCudaDevice();
VELOX_CHECK_GE(device, 0);
auto const deviceIndex = static_cast<size_t>(device);
VELOX_CHECK_LT(deviceIndex, events.size());

if (events[deviceIndex] == nullptr) {
events[deviceIndex] = new CudaEvent(cudaEventDisableTiming);
}
return *events[deviceIndex];
}

} // namespace

std::unique_ptr<cudf::table> concatenateTables(
std::vector<std::unique_ptr<cudf::table>> tables,
Expand Down Expand Up @@ -101,11 +134,7 @@ std::unique_ptr<cudf::table> getConcatenatedTable(
// the wrong stream.
auto output = cudf::concatenate(tableViews, stream, mr);

// Order input deallocations after the concatenate read.
// Since memory resources are stream-ordered, deallocations
// on inputStreams will be ordered after the concatenate completes.
CudaEvent event(cudaEventDisableTiming);
streamsWaitForStream(event, inputStreams, stream);
orderCudfVectorDeallocationsAfterStream(tables, inputStreams, stream);
// Input tables are deallocated here when 'tables' goes out of scope.
return output;
}
Expand Down Expand Up @@ -168,20 +197,15 @@ std::vector<std::unique_ptr<cudf::table>> getConcatenatedTableBatched(
stream,
mr));
}
// Order input deallocations after the concatenate reads by making all input
// streams wait for the output stream.
// Since memory resources are stream-ordered, deallocations
// on inputStreams will be ordered after the concatenate completes.
CudaEvent event(cudaEventDisableTiming);
streamsWaitForStream(event, inputStreams, stream);
orderCudfVectorDeallocationsAfterStream(tables, inputStreams, stream);

// Input tables are deallocated here when 'tables' goes out of scope.
return outputTables;
}

void streamsWaitForStream(
CudaEvent& event,
const std::vector<rmm::cuda_stream_view>& streams,
std::span<const rmm::cuda_stream_view> streams,
rmm::cuda_stream_view stream) {
event.recordFrom(stream);
for (const auto& strm : streams) {
Expand Down Expand Up @@ -216,4 +240,19 @@ const CudaEvent& CudaEvent::waitOn(rmm::cuda_stream_view stream) const {
return *this;
}

void orderCudfVectorDeallocationsAfterStream(
std::span<const CudfVectorPtr> vectors,
std::span<const rmm::cuda_stream_view> inputStreams,
rmm::cuda_stream_view stream) {
bool allRebound = true;
for (const auto& vector : vectors) {
VELOX_CHECK_NOT_NULL(vector);
allRebound &= vector->rebindStream(stream);
}

if (!allRebound) {
streamsWaitForStream(eventForThread(), inputStreams, stream);
}
}

} // namespace facebook::velox::cudf_velox
16 changes: 15 additions & 1 deletion velox/experimental/cudf/exec/Utilities.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <rmm/cuda_stream_view.hpp>

#include <memory>
#include <span>

namespace facebook::velox::cudf_velox {

Expand Down Expand Up @@ -178,6 +179,19 @@ class CudaEvent {
*/
void streamsWaitForStream(
CudaEvent& event,
const std::vector<rmm::cuda_stream_view>& streams,
std::span<const rmm::cuda_stream_view> streams,
rmm::cuda_stream_view stream);

/**
* @brief Orders CudfVector deallocations after work on a target stream.
*
* Prefer rebinding owned table buffers to @p stream so stream-ordered memory
* resources free the inputs after prior work on that stream. Falls back to an
* event wait on @p inputStreams when an input cannot be rebound without
* materializing, e.g. packed-table inputs or older cuDF builds.
*/
void orderCudfVectorDeallocationsAfterStream(
std::span<const CudfVectorPtr> vectors,
std::span<const rmm::cuda_stream_view> inputStreams,
rmm::cuda_stream_view stream);
} // namespace facebook::velox::cudf_velox
6 changes: 6 additions & 0 deletions velox/experimental/cudf/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ velox_add_cudf_test(
LIBS velox_cudf_exec Folly::folly
)

velox_add_cudf_test(
NAME velox_cudf_vector_test
SOURCES Main.cpp CudfVectorTest.cpp
LIBS velox_cudf_vector velox_vector_test_lib cudf::cudf
)

velox_add_cudf_test(
NAME velox_cudf_decimal_expression_test
SOURCES Main.cpp DecimalExpressionTest.cpp
Expand Down
222 changes: 222 additions & 0 deletions velox/experimental/cudf/tests/CudfVectorTest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/experimental/cudf/vector/CudfVector.h"

#include "velox/common/base/Exceptions.h"
#include "velox/vector/tests/utils/VectorTestBase.h"

#include <cudf/contiguous_split.hpp>
#include <cudf/utilities/error.hpp>

#include <rmm/device_buffer.hpp>
#include <rmm/mr/cuda_memory_resource.hpp>
#include <rmm/resource_ref.hpp>

#include <cuda/memory_resource>
#include <cuda/stream_ref>
#include <cuda_runtime_api.h>

#include <array>
#include <memory>
#include <vector>

using namespace facebook::velox;
using namespace facebook::velox::cudf_velox;
using namespace facebook::velox::test;

namespace {

class TestCudaStream {
public:
TestCudaStream() {
VELOX_CHECK_EQ(
cudaStreamCreateWithFlags(&stream_, cudaStreamNonBlocking),
cudaSuccess);
}

~TestCudaStream() {
if (stream_ != nullptr) {
cudaStreamDestroy(stream_);
}
}

rmm::cuda_stream_view view() const {
return rmm::cuda_stream_view{stream_};
}

cudaStream_t value() const {
return stream_;
}

private:
cudaStream_t stream_{nullptr};
};

struct RecordingState {
cudaStream_t lastDeallocationStream{nullptr};
int deallocationCount{0};
Comment thread
kjmph marked this conversation as resolved.
Outdated
};

class RecordingDeviceResource {
public:
void* allocate(
cuda::stream_ref stream,
std::size_t bytes,
std::size_t alignment = rmm::CUDA_ALLOCATION_ALIGNMENT) {
return upstream_.allocate(stream, bytes, alignment);
}

void deallocate(
cuda::stream_ref stream,
void* ptr,
std::size_t bytes,
std::size_t alignment = rmm::CUDA_ALLOCATION_ALIGNMENT) noexcept {
state_->lastDeallocationStream = stream.get();
++state_->deallocationCount;
upstream_.deallocate(stream, ptr, bytes, alignment);
}

void* allocate_sync(
std::size_t bytes,
std::size_t alignment = rmm::CUDA_ALLOCATION_ALIGNMENT) {
return upstream_.allocate_sync(bytes, alignment);
}

void deallocate_sync(
void* ptr,
std::size_t bytes,
std::size_t alignment = rmm::CUDA_ALLOCATION_ALIGNMENT) noexcept {
upstream_.deallocate_sync(ptr, bytes, alignment);
}

void reset() {
state_->lastDeallocationStream = nullptr;
state_->deallocationCount = 0;
}

int deallocationCount() const {
return state_->deallocationCount;
}

cudaStream_t lastDeallocationStream() const {
return state_->lastDeallocationStream;
}

bool operator==(const RecordingDeviceResource& other) const noexcept {
return state_ == other.state_;
}

private:
std::shared_ptr<RecordingState> state_{std::make_shared<RecordingState>()};
rmm::mr::cuda_memory_resource upstream_;
Comment thread
kjmph marked this conversation as resolved.
Outdated
};

void get_property(
const RecordingDeviceResource&,
cuda::mr::device_accessible) noexcept {}

std::unique_ptr<cudf::table> makeTable(
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr) {
std::array<int32_t, 4> values{{1, 2, 3, 4}};
rmm::device_buffer data(values.size() * sizeof(int32_t), stream, mr);
CUDF_CUDA_TRY(cudaMemcpyAsync(
data.data(),
values.data(),
values.size() * sizeof(int32_t),
cudaMemcpyHostToDevice,
stream.value()));

std::vector<std::unique_ptr<cudf::column>> columns;
columns.push_back(std::make_unique<cudf::column>(
cudf::data_type{cudf::type_id::INT32},
static_cast<cudf::size_type>(values.size()),
std::move(data),
rmm::device_buffer{},
0));
return std::make_unique<cudf::table>(std::move(columns));
}

class CudfVectorTest : public ::testing::Test, public VectorTestBase {
protected:
static void SetUpTestCase() {
memory::MemoryManager::testingSetInstance(memory::MemoryManager::Options{});
}
};

TEST_F(CudfVectorTest, rebindOwnedTableDeallocationStream) {
TestCudaStream allocationStream;
TestCudaStream targetStream;
RecordingDeviceResource resource;

auto table = makeTable(
allocationStream.view(),
rmm::to_device_async_resource_ref_checked(&resource));
allocationStream.view().synchronize();

auto vector = std::make_shared<CudfVector>(
pool_.get(),
ROW({"c0"}, {INTEGER()}),
table->num_rows(),
std::move(table),
allocationStream.view());
resource.reset();

ASSERT_TRUE(vector->rebindStream(targetStream.view()));
vector.reset();

EXPECT_GT(resource.deallocationCount(), 0);
EXPECT_EQ(resource.lastDeallocationStream(), targetStream.value());
}

TEST_F(CudfVectorTest, rebindPackedTableDeallocationStream) {
TestCudaStream allocationStream;
TestCudaStream targetStream;
RecordingDeviceResource resource;

auto table = makeTable(
allocationStream.view(), cudf::get_current_device_resource_ref());
auto packedColumns = cudf::pack(
table->view(),
allocationStream.view(),
rmm::to_device_async_resource_ref_checked(&resource));
allocationStream.view().synchronize();
auto tableView = cudf::unpack(packedColumns);
auto packedTable = std::make_unique<cudf::packed_table>(
cudf::packed_table{tableView, std::move(packedColumns)});

// Model the intra-node UCX path: the packed buffer was allocated on
// allocationStream, but downstream work is associated with targetStream.
// The CudfVector logical stream is already targetStream, but the packed
// buffer's deallocation stream is still allocationStream. rebindStream must
// update the packed buffer even when stream_ already matches targetStream.
auto vector = std::make_shared<CudfVector>(
pool_.get(),
ROW({"c0"}, {INTEGER()}),
packedTable->table.num_rows(),
std::move(packedTable),
targetStream.view());
resource.reset();

ASSERT_TRUE(vector->rebindStream(targetStream.view()));
vector.reset();

EXPECT_GT(resource.deallocationCount(), 0);
EXPECT_EQ(resource.lastDeallocationStream(), targetStream.value());
}

} // namespace
Loading
Loading