Skip to content

Commit 5ecc1e9

Browse files
committed
Add C++ read_parquet test
These do the same as the python tests, but additionally run in parallel.
1 parent 0fa42b9 commit 5ecc1e9

File tree

2 files changed

+238
-0
lines changed

2 files changed

+238
-0
lines changed

cpp/tests/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ if(RAPIDSMPF_HAVE_STREAMING)
7272
streaming/test_error_handling.cpp
7373
streaming/test_leaf_node.cpp
7474
streaming/test_partition.cpp
75+
streaming/test_read_parquet.cpp
7576
streaming/test_shuffler.cpp
7677
streaming/test_table_chunk.cpp
7778
)
Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
/**
2+
* SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
#include <cstdint>
7+
#include <filesystem>
8+
#include <iomanip>
9+
#include <memory>
10+
#include <optional>
11+
#include <sstream>
12+
#include <stdexcept>
13+
#include <string>
14+
#include <tuple>
15+
#include <vector>
16+
17+
#include <gtest/gtest.h>
18+
19+
#include <cudf/concatenate.hpp>
20+
#include <cudf/contiguous_split.hpp>
21+
#include <cudf/io/parquet.hpp>
22+
#include <cudf/io/types.hpp>
23+
#include <cudf/reduction.hpp>
24+
#include <cudf/table/table.hpp>
25+
#include <cudf/table/table_view.hpp>
26+
#include <cudf/types.hpp>
27+
#include <cudf_test/base_fixture.hpp>
28+
#include <cudf_test/column_wrapper.hpp>
29+
#include <cudf_test/table_utilities.hpp>
30+
#include <rmm/cuda_stream_view.hpp>
31+
#include <rmm/mr/device/per_device_resource.hpp>
32+
33+
#include <rapidsmpf/allgather/allgather.hpp>
34+
#include <rapidsmpf/buffer/packed_data.hpp>
35+
#include <rapidsmpf/integrations/cudf/partition.hpp>
36+
#include <rapidsmpf/streaming/core/channel.hpp>
37+
#include <rapidsmpf/streaming/core/leaf_node.hpp>
38+
#include <rapidsmpf/streaming/core/node.hpp>
39+
#include <rapidsmpf/streaming/cudf/owning_wrapper.hpp>
40+
#include <rapidsmpf/streaming/cudf/parquet.hpp>
41+
#include <rapidsmpf/streaming/cudf/table_chunk.hpp>
42+
43+
#include "base_streaming_fixture.hpp"
44+
45+
46+
using namespace rapidsmpf;
47+
using namespace rapidsmpf::streaming;
48+
49+
class StreamingReadParquet : public BaseStreamingFixture {
50+
protected:
51+
void SetUp() override {
52+
BaseStreamingFixture::SetUp();
53+
constexpr int nfiles = 10;
54+
constexpr int nrows = 10;
55+
56+
temp_dir = std::filesystem::temp_directory_path() / "rapidsmpf_read_parquet_test";
57+
58+
for (int i = 0; i < nfiles; ++i) {
59+
std::ostringstream filename_stream;
60+
filename_stream << std::setw(3) << std::setfill(' ') << i << ".pq";
61+
std::filesystem::path filepath = temp_dir / filename_stream.str();
62+
source_files.push_back(filepath.string());
63+
}
64+
65+
if (GlobalEnvironment->comm_->rank() == 0) {
66+
std::filesystem::create_directories(temp_dir);
67+
68+
int start = 0;
69+
for (auto& file : source_files) {
70+
auto values = std::ranges::iota_view(start, start + nrows);
71+
cudf::test::fixed_width_column_wrapper<int32_t> col(
72+
values.begin(), values.end()
73+
);
74+
75+
std::vector<std::unique_ptr<cudf::column>> columns;
76+
columns.push_back(col.release());
77+
auto table = std::make_unique<cudf::table>(std::move(columns));
78+
79+
cudf::io::sink_info sink{file};
80+
auto options =
81+
cudf::io::parquet_writer_options::builder(sink, table->view())
82+
.build();
83+
cudf::io::write_parquet(options);
84+
start += nrows + nrows / 2;
85+
}
86+
}
87+
88+
GlobalEnvironment->barrier();
89+
}
90+
91+
void TearDown() override {
92+
GlobalEnvironment->barrier();
93+
94+
if (GlobalEnvironment->comm_->rank() == 0 && std::filesystem::exists(temp_dir)) {
95+
std::filesystem::remove_all(temp_dir);
96+
}
97+
98+
BaseStreamingFixture::TearDown();
99+
}
100+
101+
[[nodiscard]] cudf::io::source_info get_source_info() const {
102+
return cudf::io::source_info(source_files);
103+
}
104+
105+
std::filesystem::path temp_dir;
106+
std::vector<std::string> source_files;
107+
};
108+
109+
class StreamingReadParquetThrottle : public StreamingReadParquet,
110+
public ::testing::WithParamInterface<int> {};
111+
112+
INSTANTIATE_TEST_SUITE_P(
113+
InvalidMaxTickets, StreamingReadParquetThrottle, ::testing::Values(-1, 0)
114+
);
115+
116+
TEST_P(StreamingReadParquetThrottle, NonPositiveThrottleThrows) {
117+
if (GlobalEnvironment->comm_->nranks() > 1) {
118+
// Test is independent of size of communicator, so don't bother if we
119+
// have more than one rank.
120+
GTEST_SKIP();
121+
}
122+
int max_tickets = GetParam();
123+
auto source = get_source_info();
124+
auto options = cudf::io::parquet_reader_options::builder(source).build();
125+
126+
auto ch = std::make_shared<Channel>();
127+
std::vector<Node> nodes;
128+
nodes.push_back(node::read_parquet(ctx, ch, max_tickets, options, 100));
129+
130+
EXPECT_THROW(run_streaming_pipeline(std::move(nodes)), std::logic_error);
131+
}
132+
133+
using ReadParquetParams = std::tuple<std::optional<int64_t>, std::optional<int64_t>>;
134+
135+
class StreamingReadParquetParams
136+
: public StreamingReadParquet,
137+
public ::testing::WithParamInterface<ReadParquetParams> {};
138+
139+
INSTANTIATE_TEST_SUITE_P(
140+
ReadParquetCombinations,
141+
StreamingReadParquetParams,
142+
::testing::Combine(
143+
// skip_rows
144+
::testing::Values(
145+
std::nullopt,
146+
std::optional<int64_t>{7},
147+
std::optional<int64_t>{19},
148+
std::optional<int64_t>{113}
149+
),
150+
// num_rows
151+
::testing::Values(
152+
std::nullopt,
153+
std::optional<int64_t>{0},
154+
std::optional<int64_t>{3},
155+
std::optional<int64_t>{31},
156+
std::optional<int64_t>{83}
157+
)
158+
),
159+
[](const ::testing::TestParamInfo<ReadParquetParams>& info) {
160+
const auto& skip_rows = std::get<0>(info.param);
161+
const auto& num_rows = std::get<1>(info.param);
162+
std::string result = "skip_rows_";
163+
result += skip_rows.has_value() ? std::to_string(skip_rows.value()) : "none";
164+
result += "_num_rows_";
165+
result += num_rows.has_value() ? std::to_string(num_rows.value()) : "all";
166+
return result;
167+
}
168+
);
169+
170+
TEST_P(StreamingReadParquetParams, ReadParquet) {
171+
auto [skip_rows, num_rows] = GetParam();
172+
auto source = get_source_info();
173+
174+
auto options = cudf::io::parquet_reader_options::builder(source).build();
175+
if (skip_rows.has_value()) {
176+
options.set_skip_rows(skip_rows.value());
177+
}
178+
if (num_rows.has_value()) {
179+
options.set_num_rows(num_rows.value());
180+
}
181+
182+
auto ch = std::make_shared<Channel>();
183+
std::vector<Node> nodes;
184+
185+
nodes.push_back(node::read_parquet(ctx, ch, 4, options, 3));
186+
187+
std::vector<Message> messages;
188+
nodes.push_back(node::pull_from_channel(ctx, ch, messages));
189+
190+
if (GlobalEnvironment->comm_->nranks() > 1
191+
&& (skip_rows.value_or(0) > 0 || num_rows.has_value()))
192+
{
193+
// We don't yet implement skip_rows/num_rows in multi-rank mode
194+
EXPECT_THROW(run_streaming_pipeline(std::move(nodes)), std::logic_error);
195+
return;
196+
}
197+
run_streaming_pipeline(std::move(nodes));
198+
199+
allgather::AllGather allgather(
200+
GlobalEnvironment->comm_,
201+
GlobalEnvironment->progress_thread_,
202+
/* op_id = */ 0,
203+
br.get()
204+
);
205+
206+
for (auto& msg : messages) {
207+
auto chunk = msg.release<TableChunk>();
208+
auto seq = chunk.sequence_number();
209+
auto [reservation, _] =
210+
br->reserve(MemoryType::DEVICE, chunk.make_available_cost(), true);
211+
chunk = chunk.make_available(reservation);
212+
auto packed_columns =
213+
cudf::pack(chunk.table_view(), chunk.stream(), br->device_mr());
214+
auto packed_data = PackedData{
215+
std::move(packed_columns.metadata),
216+
br->move(std::move(packed_columns.gpu_data), chunk.stream())
217+
};
218+
219+
allgather.insert(seq, std::move(packed_data));
220+
}
221+
222+
allgather.insert_finished();
223+
224+
// May as well check on all ranks, so we also mildly exercise the allgather.
225+
auto gathered_packed_data =
226+
allgather.wait_and_extract(allgather::AllGather::Ordered::YES);
227+
auto result = unpack_and_concat(
228+
std::move(gathered_packed_data), br->stream_pool().get_stream(), br.get()
229+
);
230+
auto expected = cudf::io::read_parquet(options).tbl;
231+
232+
EXPECT_EQ(result->num_rows(), expected->num_rows());
233+
EXPECT_EQ(result->num_columns(), expected->num_columns());
234+
EXPECT_EQ(result->num_columns(), 1);
235+
CUDF_TEST_EXPECT_TABLES_EQUIVALENT(result->view(), expected->view());
236+
}
237+
}

0 commit comments

Comments
 (0)