Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 19 additions & 12 deletions velox/experimental/cudf/exec/Utilities.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,7 @@ std::unique_ptr<cudf::table> getConcatenatedTable(
// the wrong stream.
auto output = cudf::concatenate(tableViews, stream, mr);

// Order input deallocations after the concatenate read.
// Since memory resources are stream-ordered, deallocations
// on inputStreams will be ordered after the concatenate completes.
CudaEvent event(cudaEventDisableTiming);
streamsWaitForStream(event, inputStreams, stream);
orderCudfVectorDeallocationsAfterStream(tables, inputStreams, stream);
// Input tables are deallocated here when 'tables' goes out of scope.
return output;
}
Expand Down Expand Up @@ -168,20 +164,15 @@ std::vector<std::unique_ptr<cudf::table>> getConcatenatedTableBatched(
stream,
mr));
}
// Order input deallocations after the concatenate reads by making all input
// streams wait for the output stream.
// Since memory resources are stream-ordered, deallocations
// on inputStreams will be ordered after the concatenate completes.
CudaEvent event(cudaEventDisableTiming);
streamsWaitForStream(event, inputStreams, stream);
orderCudfVectorDeallocationsAfterStream(tables, inputStreams, stream);

// Input tables are deallocated here when 'tables' goes out of scope.
return outputTables;
}

void streamsWaitForStream(
CudaEvent& event,
const std::vector<rmm::cuda_stream_view>& streams,
std::span<const rmm::cuda_stream_view> streams,
rmm::cuda_stream_view stream) {
event.recordFrom(stream);
for (const auto& strm : streams) {
Expand Down Expand Up @@ -216,4 +207,20 @@ const CudaEvent& CudaEvent::waitOn(rmm::cuda_stream_view stream) const {
return *this;
}

void orderCudfVectorDeallocationsAfterStream(
std::span<const CudfVectorPtr> vectors,
std::span<const rmm::cuda_stream_view> inputStreams,
rmm::cuda_stream_view stream) {
bool allRebound = true;
for (const auto& vector : vectors) {
VELOX_CHECK_NOT_NULL(vector);
allRebound &= vector->rebindStream(stream);
}

if (!allRebound) {
CudaEvent event(cudaEventDisableTiming);
Copy link
Copy Markdown
Collaborator

@karthikeyann karthikeyann Jun 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @bdice @vyasr please suggest an idea here.
Event creation is costly. The join_stream in cudf uses a thread_local static to avoid creating repeatedly.

https://github.com/rapidsai/cudf/blob/3b337d749d13bc8ab3a481cf09d1b5a66dd136c9/cpp/src/utilities/stream_pool.cpp#L96

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we just want to do the same thing for Velox that we're doing in libcudf? Seems like a similar use case.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you like me to propose a version that does this? In the same PR branch?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The libcudf solution seems fine. I don't see any issues using a TLS event for this use case.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think so. Let's adopt that TLS event solution here.

streamsWaitForStream(event, inputStreams, stream);
}
}

} // namespace facebook::velox::cudf_velox
16 changes: 15 additions & 1 deletion velox/experimental/cudf/exec/Utilities.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <rmm/cuda_stream_view.hpp>

#include <memory>
#include <span>

namespace facebook::velox::cudf_velox {

Expand Down Expand Up @@ -178,6 +179,19 @@ class CudaEvent {
*/
void streamsWaitForStream(
CudaEvent& event,
const std::vector<rmm::cuda_stream_view>& streams,
std::span<const rmm::cuda_stream_view> streams,
rmm::cuda_stream_view stream);

/**
* @brief Orders CudfVector deallocations after work on a target stream.
*
* Prefer rebinding owned table buffers to @p stream so stream-ordered memory
* resources free the inputs after prior work on that stream. Falls back to an
* event wait on @p inputStreams when an input cannot be rebound without
* materializing, e.g. packed-table inputs or older cuDF builds.
*/
void orderCudfVectorDeallocationsAfterStream(
std::span<const CudfVectorPtr> vectors,
std::span<const rmm::cuda_stream_view> inputStreams,
rmm::cuda_stream_view stream);
} // namespace facebook::velox::cudf_velox
33 changes: 33 additions & 0 deletions velox/experimental/cudf/vector/CudfVector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@
#include <cudf/column/column.hpp>
#include <cudf/table/table.hpp>

#if __has_include(<cudf/column/column_stream.hpp>)
Copy link
Copy Markdown
Collaborator

@bdice bdice May 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this for backwards compatibility with older cudf versions? We should unconditionally include and use this feature, and update our pinned cudf commit if needed to support it. Run scripts/update-cudf-deps.sh --branch main.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct, the intention was backwards compatibility. It seems upgrading the dependencies would be a larger change? Or, do we prefer to serialize these changes so we don't need to maintain conditional includes? This Use-after-free PR could be broken out to not include rebind_stream, if it we want to concurrently fix the UAF and update dependencies. (Since I'm assuming updating dependencies is a full test / regression cycle?)

Copy link
Copy Markdown
Collaborator

@karthikeyann karthikeyann Jun 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

open PR #17572 is upgrading cudf commit.
This could PR could follow after that PR merges.

Copy link
Copy Markdown
Collaborator

@bdice bdice Jun 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delayed response.

If you need a new feature/fix, please use scripts/update-cudf-deps.sh to update. As long as cuDF support is experimental, we are fine letting the version requirement float forward when updates are needed. Eventually we will need to establish a version support policy.

I proposed in #17572 (really in zoltan#1) to adopt the latest commits from --branch release/26.06 because the 26.08 changes in main aren't compatible until we land some form of CMake 4.0 update, which is in progress in #17637.

Copy link
Copy Markdown
Author

@kjmph kjmph Jun 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I traced through this. I started this on an older branch, and migrated it to the newest branch. The commit in this PR is still valid, but in the intervening time cuDF was upgraded with this subsequent commit:
54bac93

Which upgraded the cuDF dependency to d09d10d14d3ed932b8de93638809101af5c7fec3. If I check that version I see that it does have rebind_stream:
https://github.com/rapidsai/cudf/blob/d09d10d14d3ed932b8de93638809101af5c7fec3/cpp/include/cudf/column/column_stream.hpp#L21-L40

So, perhaps we can remove this backwards compatibility shim now, and not serialize this PR behind another upgrade cycle?

Copy link
Copy Markdown
Collaborator

@bdice bdice Jun 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's correct. We should be clear to remove this compatibility shim now that we are pinning a newer cudf commit.

#include <cudf/column/column_stream.hpp>
#define VELOX_CUDF_HAS_COLUMN_REBIND_STREAM 1
#else
#define VELOX_CUDF_HAS_COLUMN_REBIND_STREAM 0
#endif

namespace facebook::velox::cudf_velox {
namespace {

Expand Down Expand Up @@ -175,6 +182,32 @@ std::unique_ptr<cudf::table> CudfVector::release() {
return materializedTable;
}

bool CudfVector::rebindStream(rmm::cuda_stream_view stream) {
if (stream_.value() == stream.value()) {
return true;
}

#if VELOX_CUDF_HAS_COLUMN_REBIND_STREAM
if (auto* tablePtr =
std::get_if<std::unique_ptr<cudf::table>>(&tableStorage_)) {
if (!*tablePtr) {
Comment on lines +191 to +193
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we ask libcudf for rebind_stream for packed_table or packed_column too?
Open up an issue in cudf repo.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If yes; It is possible to implement it here. But I suggest to ask libcudf to provide one.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, that's a good observation. I recommend that we implement that in libcudf.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! It seems that this is a miss in the proposed PR, we don't handle packed_table correctly. Irrespective of libcudf support, would we accept this change?

bool CudfVector::rebindStream(rmm::cuda_stream_view stream) {
  if (auto* tablePtr =
          std::get_if<std::unique_ptr<cudf::table>>(&tableStorage_)) {
    if (!*tablePtr) {
      return false;
    }

    if (stream_.value() == stream.value()) {
      return true;
    }

#if VELOX_CUDF_HAS_COLUMN_REBIND_STREAM
    auto columns = (*tablePtr)->release();
    for (auto& column : columns) {
      column = cudf::rebind_stream(std::move(*column), stream);
    }

    *tablePtr = std::make_unique<cudf::table>(std::move(columns));
    tabView_ = (*tablePtr)->view();
    stream_ = stream;
    return true;
#else
    return false;
#endif
  }

  if (auto* packedPtr =
          std::get_if<std::unique_ptr<cudf::packed_table>>(&tableStorage_)) {
    if (!*packedPtr) {
      return false;
    }

    (*packedPtr)->data.gpu_data->set_stream(stream);
    stream_ = stream;
    return true;
  }

  return false;
}

To ensure this is correct, I created two new test cases, rebindOwnedTableDeallocationStream and rebindPackedTableDeallocationStream. I can verify that the second test case failed previously, but with the above version of CudfVector::rebindStream, it passes. I can add these two tests to this PR, if that's reasonable?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Side note, see the other discussion, maybe we ran remove the #if / #else)

Copy link
Copy Markdown
Collaborator

@bdice bdice Jun 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would we accept this change?

Great idea. Yes, this seems reasonable.

And yes, please add those tests. As noted above, we can assume VELOX_CUDF_HAS_COLUMN_REBIND_STREAM now.

return false;
}

auto columns = (*tablePtr)->release();
for (auto& column : columns) {
column = cudf::rebind_stream(std::move(*column), stream);
}

*tablePtr = std::make_unique<cudf::table>(std::move(columns));
tabView_ = (*tablePtr)->view();
Comment thread
kjmph marked this conversation as resolved.
stream_ = stream;
return true;
}
#endif
return false;
}

uint64_t CudfVector::estimateFlatSize() const {
return flatSize_;
}
Expand Down
5 changes: 5 additions & 0 deletions velox/experimental/cudf/vector/CudfVector.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ class CudfVector : public RowVector {
/// first (which copies the data).
std::unique_ptr<cudf::table> release();

/// Rebinds owned table buffers to use 'stream' for future deallocation.
/// Returns false when the storage cannot be rebound without materializing or
/// when the cuDF rebind API is unavailable.
bool rebindStream(rmm::cuda_stream_view stream);

uint64_t estimateFlatSize() const override;

private:
Expand Down
14 changes: 9 additions & 5 deletions velox/experimental/ucx-exchange/UcxPartitionedOutput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
#include "velox/core/QueryConfig.h"
#include "velox/exec/Driver.h"
#include "velox/exec/Operator.h"
#include "velox/experimental/cudf/exec/Utilities.h"
#include "velox/experimental/cudf/vector/CudfVector.h"

#include <cudf/concatenate.hpp>
#include <cudf/contiguous_split.hpp>
#include <cudf/copying.hpp>
#include <cudf/detail/utilities/stream_pool.hpp>
#include <cudf/partitioning.hpp>

using namespace facebook::velox::cudf_velox;
Expand Down Expand Up @@ -128,24 +130,26 @@ void UcxPartitionedOutput::flushPending() {
? cv->getTableView()
: cv->getTableView().select(remap_.begin(), remap_.end());
} else {
// Sync all input streams so their GPU data is ready to read.
for (auto& v : pendingInputs_) {
v->stream().synchronize();
}

// Collect (remapped) table views.
std::vector<cudf::table_view> views;
std::vector<rmm::cuda_stream_view> inputStreams;
views.reserve(pendingInputs_.size());
inputStreams.reserve(pendingInputs_.size());
for (auto& v : pendingInputs_) {
inputStreams.push_back(v->stream());
views.push_back(
remap_.empty()
? v->getTableView()
: v->getTableView().select(remap_.begin(), remap_.end()));
}

cudf::detail::join_streams(inputStreams, stream);
mergedTable = cudf::concatenate(
views, stream, cudf::get_current_device_resource_ref());

orderCudfVectorDeallocationsAfterStream(
pendingInputs_, inputStreams, stream);

// Free input GPU memory before partitioning (peak = 2x -> 1x).
pendingInputs_.clear();

Expand Down