Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 23 additions & 4 deletions src/v/datalake/serde_parquet_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,33 @@

namespace datalake {

writer_error serde_parquet_writer::set_error(writer_error e) {
_error = e;
return _error;
}

ss::future<writer_error> serde_parquet_writer::add_data_struct(
iceberg::struct_value value, size_t, ss::abort_source& as) {
// This method should always return `writer_error::ok` if
// `_writer.write_row(...)` is successful. However, we still want to convey
// memory and disk reservation errors that happen after the row write.
// Hence, the current solution is to return those errors on the subsequent
// call to `add_data_struct`.
//
// Similar to `local_parquet_file_writer` once an error has occured further
// writes are prevented.
if (_error != writer_error::ok) {
co_return _error;
}

auto conversion_result = co_await to_parquet_value(
std::make_unique<iceberg::struct_value>(std::move(value)));
if (conversion_result.has_error()) {
vlog(
datalake_log.warn,
"Error converting iceberg struct to parquet value - {}",
conversion_result.error());
co_return writer_error::parquet_conversion_error;
co_return set_error(writer_error::parquet_conversion_error);
}

auto group = std::get<serde::parquet::group_value>(
Expand All @@ -41,7 +58,8 @@ ss::future<writer_error> serde_parquet_writer::add_data_struct(
auto result = co_await disk.reserve_bytes(
new_total_bytes - total_bytes, as);
if (result != reservation_error::ok) {
co_return map_to_writer_error(result);
set_error(map_to_writer_error(result));
co_return writer_error::ok;
}
} else if (new_total_bytes < total_bytes) {
auto& disk = _mem_tracker.disk();
Expand All @@ -56,7 +74,8 @@ ss::future<writer_error> serde_parquet_writer::add_data_struct(
auto reservation_result = co_await _mem_tracker.reserve_bytes(
new_buffered_bytes - _buffered_bytes, as);
if (reservation_result != reservation_error::ok) {
co_return map_to_writer_error(reservation_result);
set_error(map_to_writer_error(reservation_result));
co_return writer_error::ok;
}
} else if (new_buffered_bytes < _buffered_bytes) {
// underlying writer may choose to compress data when
Expand All @@ -71,7 +90,7 @@ ss::future<writer_error> serde_parquet_writer::add_data_struct(
datalake_log.warn,
"Error writing parquet row - {}",
std::current_exception());
co_return writer_error::file_io_error;
co_return set_error(writer_error::file_io_error);
}
co_return writer_error::ok;
}
Expand Down
3 changes: 3 additions & 0 deletions src/v/datalake/serde_parquet_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ class serde_parquet_writer : public parquet_ostream {
writer_mem_tracker& _mem_tracker;
int64_t _buffered_bytes{0};
int64_t _flushed_bytes{0};
// Used to store any errors that occur after a row write is successful.
writer_error _error{writer_error::ok};
writer_error set_error(writer_error);
};

class serde_parquet_writer_factory : public parquet_ostream_factory {
Expand Down
34 changes: 34 additions & 0 deletions src/v/datalake/tests/serde_parquet_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,37 @@ TEST(SerdeParquetWriterTest, CheckIfTheWriterWritesData) {
ASSERT_EQ(finish_result, datalake::writer_error::ok);
ASSERT_GT(target.size_bytes(), 0);
}

TEST(SerdeParquetWriterTest, ValidateWriterBehaviorOnOOM) {
auto schema = test_schema(iceberg::field_required::no);
iobuf target;

datalake::noop_mem_tracker mem_tracker;
auto writer = datalake::serde_parquet_writer_factory{}
.create_writer(
schema, make_iobuf_ref_output_stream(target), mem_tracker)
.get();

auto v = iceberg::tests::make_value(
iceberg::tests::value_spec{.null_pct = 50},
iceberg::field_type{std::move(schema)});

auto s_v = std::get<std::unique_ptr<iceberg::struct_value>>(std::move(v));

ss::abort_source as;
mem_tracker.inject_oom_on_next_reserve();
auto result = writer->add_data_struct(std::move(*s_v), 0, as).get();

auto finish_result = writer->finish().get();
ASSERT_EQ(finish_result, datalake::writer_error::ok);

if (target.size_bytes() > 0) {
// If the value was written despite the OOM then the writer is required
// to return `ok`.
ASSERT_EQ(result, datalake::writer_error::ok);

} else {
// Otherwise the result should be some error.
ASSERT_NE(result, datalake::writer_error::ok);
}
}
9 changes: 8 additions & 1 deletion src/v/datalake/tests/test_data_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,20 @@ class noop_mem_tracker : public writer_mem_tracker {
public:
ss::future<reservation_error>
reserve_bytes(size_t, ss::abort_source&) noexcept override {
return ss::make_ready_future<reservation_error>(reservation_error::ok);
if (std::exchange(_oom_on_next_reserve, false)) {
co_return reservation_error::out_of_memory;
} else {
co_return reservation_error::ok;
}
}
ss::future<> free_bytes(size_t, ss::abort_source&) override {
return ss::make_ready_future<>();
}
void release() override {}
writer_disk_tracker& disk() override { return _disk; }

void inject_oom_on_next_reserve() { _oom_on_next_reserve = true; }

private:
class noop_disk_tracker : public writer_disk_tracker {
public:
Expand All @@ -49,6 +55,7 @@ class noop_mem_tracker : public writer_mem_tracker {
};

noop_disk_tracker _disk;
bool _oom_on_next_reserve{false};
};

class test_data_writer : public parquet_file_writer {
Expand Down
13 changes: 11 additions & 2 deletions src/v/datalake/tests/translation_task_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -345,8 +345,17 @@ TEST_F(TranslateTaskTest, TestCleanupAfterOOMError) {
as)
.get();

ASSERT_TRUE(result.has_error());
ASSERT_EQ(result.error(), datalake::translation_task::errc::no_data);
if (result.has_error()) {
// There are two ways a writer can handle an OOM. If it allocates the
// memory units prior to writing the record then there should be no data
// when flushed.
ASSERT_EQ(result.error(), datalake::translation_task::errc::no_data);
} else {
// Otherwise if it allocates the memory units after writing the record
// then only a single record should've been written at which point the
// OOM error would've bubbled up to the translator.
ASSERT_EQ(result.value().last_offset, kafka::offset{0});
}

// check no data files are left behind
ASSERT_THAT(list_data_files().get(), IsEmpty());
Expand Down